Author: rgodfrey
Date: Thu Apr 17 16:17:46 2014
New Revision: 1588301
URL: http://svn.apache.org/r1588301
Log:
QPID-5710 : [Java Broker] Remove AMQQueueFactory and QueueRegistry
Added:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
- copied, changed from r1588234,
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
Removed:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
Thu Apr 17 16:17:46 2014
@@ -163,7 +163,6 @@ public class BDBHAVirtualHost extends Ab
getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
removeHouseKeepingTasks();
- getQueueRegistry().close();
getDtxRegistry().close();
finalState = VirtualHostState.PASSIVE;
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Thu Apr 17 16:17:46 2014
@@ -313,7 +313,16 @@ public abstract class AbstractQueue<X ex
_logSubject = new QueueLogSubject(this);
- _virtualHost.getSecurityManager().authoriseCreateQueue(this);
+ try
+ {
+
+ _virtualHost.getSecurityManager().authoriseCreateQueue(this);
+ }
+ catch(AccessControlException e)
+ {
+ deleted();
+ throw e;
+ }
Subject activeSubject =
Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ?
Collections.<SessionPrincipal>emptySet() :
activeSubject.getPrincipals(SessionPrincipal.class);
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
Thu Apr 17 16:17:46 2014
@@ -31,7 +31,7 @@ public class LastValueQueueImpl extends
public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
- protected LastValueQueueImpl(VirtualHostImpl virtualHost,
+ public LastValueQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
super(virtualHost, attributes, entryList(attributes));
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
Thu Apr 17 16:17:46 2014
@@ -30,7 +30,7 @@ public class PriorityQueueImpl extends O
public static final int DEFAULT_PRIORITY_LEVELS = 10;
- protected PriorityQueueImpl(VirtualHostImpl virtualHost,
+ public PriorityQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
super(virtualHost, attributes, entryList(attributes));
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
Thu Apr 17 16:17:46 2014
@@ -44,7 +44,7 @@ public class SortedQueueImpl extends Out
}
- protected SortedQueueImpl(VirtualHostImpl virtualHost,
+ public SortedQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
this(virtualHost,
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Thu Apr 17 16:17:46 2014
@@ -43,6 +43,7 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -69,13 +70,13 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.LastValueQueue;
import org.apache.qpid.server.queue.LastValueQueueImpl;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.PriorityQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.PriorityQueueImpl;
import org.apache.qpid.server.queue.SortedQueue;
+import org.apache.qpid.server.queue.SortedQueueImpl;
+import org.apache.qpid.server.queue.StandardQueueImpl;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -91,12 +92,17 @@ import org.apache.qpid.server.store.hand
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>>
extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>,
IConnectionRegistry.RegistryChangeListener, EventListener,
VirtualHost<X,AMQQueue<?>, ExchangeImpl<?>>
{
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ private static final int MAX_LENGTH = 255;
+
private static final Logger _logger =
Logger.getLogger(AbstractVirtualHost.class);
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
@@ -107,12 +113,10 @@ public abstract class AbstractVirtualHos
private final Broker<?> _broker;
- private final QueueRegistry _queueRegistry;
-
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
- private final AMQQueueFactory _queueFactory;
+
private final SystemNodeRegistry _systemNodeRegistry = new
SystemNodeRegistry();
private volatile VirtualHostState _state = VirtualHostState.INITIALISING;
@@ -182,10 +186,6 @@ public abstract class AbstractVirtualHos
_connectionRegistry = new ConnectionRegistry();
_connectionRegistry.addRegistryChangeListener(this);
- _queueRegistry = new DefaultQueueRegistry(this);
-
- _queueFactory = new AMQQueueFactory(this, _queueRegistry);
-
_defaultDestination = new DefaultDestination(this);
}
@@ -419,11 +419,7 @@ public abstract class AbstractVirtualHos
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C>
clazz)
{
- if(clazz == Queue.class)
- {
- return (Collection<C>) getQueues();
- }
- else if(clazz == Connection.class)
+ if(clazz == Connection.class)
{
return (Collection<C>) getConnections();
}
@@ -567,15 +563,10 @@ public abstract class AbstractVirtualHos
return _createTime;
}
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
@Override
public AMQQueue<?> getQueue(String name)
{
- return _queueRegistry.getQueue(name);
+ return (AMQQueue<?>) getChildByName(Queue.class, name);
}
@Override
@@ -588,34 +579,31 @@ public abstract class AbstractVirtualHos
@Override
public AMQQueue<?> getQueue(UUID id)
{
- return _queueRegistry.getQueue(id);
+ return (AMQQueue<?>) getChildById(Queue.class, id);
}
@Override
public Collection<AMQQueue<?>> getQueues()
{
- return _queueRegistry.getQueues();
+ Collection children = getChildren(Queue.class);
+ return children;
}
@Override
public int removeQueue(AMQQueue<?> queue)
{
- synchronized (getQueueRegistry())
- {
- int purged = queue.delete();
+ int purged = queue.delete();
- getQueueRegistry().unregisterQueue(queue.getName());
- if (queue.isDurable() && !(queue.getLifetimePolicy()
- ==
LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || queue.getLifetimePolicy()
- ==
LifetimePolicy.DELETE_ON_SESSION_END))
- {
- DurableConfigurationStore store =
getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeQueue(store, queue);
- }
- return purged;
+ if (queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ DurableConfigurationStore store = getDurableConfigurationStore();
+ DurableConfigurationStoreHelper.removeQueue(store, queue);
}
- }
+ return purged;
+}
public AMQQueue<?> createQueue(Map<String, Object> attributes) throws
QueueExistsException
{
@@ -656,36 +644,21 @@ public abstract class AbstractVirtualHos
}
}
- String queueName = MapValueConverter.getStringAttribute(Queue.NAME,
attributes);
-
- synchronized (_queueRegistry)
+ if(!attributes.containsKey(Queue.ID))
{
- if(_queueRegistry.getQueue(queueName) != null)
- {
- throw new QueueExistsException("Queue with name " + queueName
+ " already exists", _queueRegistry.getQueue(queueName));
- }
- if(!attributes.containsKey(Queue.ID))
- {
-
- UUID id = UUID.randomUUID();
- while(_queueRegistry.getQueue(id) != null)
- {
- id = UUID.randomUUID();
- }
- attributes.put(Queue.ID, id);
+ UUID id = UUID.randomUUID();
+ attributes.put(Queue.ID, id);
+ }
- }
- else
if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID,
attributes)) != null)
- {
- throw new QueueExistsException("Queue with id "
- +
MapValueConverter.getUUIDAttribute(Queue.ID,
-
attributes)
- + " already exists",
_queueRegistry.getQueue(queueName));
- }
+ boolean createDLQ = shouldCreateDLQ(attributes,
getDefaultDeadLetterQueueEnabled());
+ if (createDLQ)
+ {
+ // TODO - this isn't really correct - what if the name has ${foo}
in it?
+ validateDLNames(String.valueOf(attributes.get(Queue.NAME)));
+ }
+ return createOrRestoreQueue(attributes, true);
- return _queueFactory.createQueue(attributes);
- }
}
@@ -873,7 +846,6 @@ public abstract class AbstractVirtualHos
{
//Stop Connections
_connectionRegistry.close();
- _queueRegistry.close();
_dtxRegistry.close();
closeStorage();
shutdownHouseKeeping();
@@ -1127,7 +1099,7 @@ public abstract class AbstractVirtualHos
protected Map<String, DurableConfiguredObjectRecoverer>
getDurableConfigurationRecoverers()
{
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(this, _queueFactory),
+ new QueueRecoverer(this),
new ExchangeRecoverer(this),
new BindingRecoverer(this)
};
@@ -1178,7 +1150,7 @@ public abstract class AbstractVirtualHos
public void execute()
{
- for (AMQQueue<?> q : _queueRegistry.getQueues())
+ for (AMQQueue<?> q : getQueues())
{
if (_logger.isDebugEnabled())
{
@@ -1552,4 +1524,220 @@ public abstract class AbstractVirtualHos
return Collections.unmodifiableCollection(_aliases);
}
+
+ // TODO - remove
+ public AMQQueue restoreQueue(Map<String, Object> attributes)
+ {
+ return createOrRestoreQueue(attributes, false);
+
+ }
+
+
+ private AMQQueue createOrRestoreQueue(Map<String, Object> attributes,
boolean createInStore)
+ {
+ String queueName =
MapValueConverter.getStringAttribute(Queue.NAME,attributes);
+ boolean createDLQ = createInStore && shouldCreateDLQ(attributes,
getDefaultDeadLetterQueueEnabled());
+ if (createDLQ)
+ {
+ validateDLNames(queueName);
+ }
+
+ AMQQueue queue;
+
+ try
+ {
+
+
+ if (attributes.containsKey(SortedQueue.SORT_KEY))
+ {
+ queue = new SortedQueueImpl(this, attributes);
+ }
+ else if (attributes.containsKey(LastValueQueue.LVQ_KEY))
+ {
+ queue = new LastValueQueueImpl(this, attributes);
+ }
+ else if (attributes.containsKey(PriorityQueue.PRIORITIES))
+ {
+ queue = new PriorityQueueImpl(this, attributes);
+ }
+ else
+ {
+ queue = new StandardQueueImpl(this, attributes);
+ }
+ queue.open();
+ }
+ catch(DuplicateNameException e)
+ {
+
+ throw new QueueExistsException(e.getName(), getQueue(e.getName()));
+ }
+
+ if(createDLQ)
+ {
+ createDLQ(queue);
+ }
+ else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE)
instanceof String)
+ {
+
+ final String altExchangeAttr = (String)
attributes.get(Queue.ALTERNATE_EXCHANGE);
+ ExchangeImpl altExchange;
+ try
+ {
+ altExchange = getExchange(UUID.fromString(altExchangeAttr));
+ }
+ catch(IllegalArgumentException e)
+ {
+ altExchange = getExchange(altExchangeAttr);
+ }
+ queue.setAlternateExchange(altExchange);
+ }
+
+ if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy()
+ ==
LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ ||
queue.getLifetimePolicy()
+ ==
LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+
DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(),
queue);
+ }
+
+ return queue;
+ }
+
+
+
+ private void createDLQ(final AMQQueue queue)
+ {
+ final String queueName = queue.getName();
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
+
+ ExchangeImpl dlExchange = null;
+ final UUID dlExchangeId =
UUIDGenerator.generateExchangeUUID(dlExchangeName, getName());
+
+ try
+ {
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID,
dlExchangeId);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,
dlExchangeName);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,
ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE,
true);
+
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ false ? LifetimePolicy.DELETE_ON_NO_LINKS :
LifetimePolicy.PERMANENT);
+
attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ dlExchange = createExchange(attributes);
+ }
+ catch(ExchangeExistsException e)
+ {
+ // We're ok if the exchange already exists
+ dlExchange = e.getExistingExchange();
+ }
+ catch (ReservedExchangeNameException e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an
alternate exchange for a queue failed",e);
+ }
+ catch (AMQUnknownExchangeType e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an
alternate exchange for a queue failed",e);
+ }
+ catch (UnknownExchangeException e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an
alternate exchange for a queue failed",e);
+ }
+
+ AMQQueue dlQueue = null;
+
+ {
+ dlQueue = getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ-ing/MDC from the DLQ itself,
preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
+
+ try
+ {
+
+
+ args.put(Queue.ID, UUID.randomUUID());
+ args.put(Queue.NAME, dlQueueName);
+ args.put(Queue.DURABLE, true);
+ dlQueue = createQueue(args);
+ }
+ catch (QueueExistsException e)
+ {
+ // TODO - currently theoretically for two threads to be
creating a queue at the same time.
+ // All model changing operations should be moved to the
task executor of the virtual host
+ }
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue))
+ {
+ //actual routing key used does not matter due to use of fanout
exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY,
dlQueue, null);
+ }
+ queue.setAlternateExchange(dlExchange);
+ }
+
+ private static void validateDLNames(String name)
+ {
+ // check if DLQ name and DLQ exchange name do not exceed 255
+ String exchangeName = getDeadLetterExchangeName(name);
+ if (exchangeName.length() > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DL exchange name '" +
exchangeName
+ + "' length exceeds limit of "
+ MAX_LENGTH + " characters for queue " + name);
+ }
+ String queueName = getDeadLetterQueueName(name);
+ if (queueName.length() > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DLQ queue name '" + queueName
+ "' length exceeds limit of "
+ + MAX_LENGTH + " characters for
queue " + name);
+ }
+ }
+
+ private static boolean shouldCreateDLQ(Map<String, Object> arguments,
boolean virtualHostDefaultDeadLetterQueueEnabled)
+ {
+ boolean autoDelete =
MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+
Queue.LIFETIME_POLICY,
+ arguments,
+
LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+
+ //feature is not to be enabled for temporary queues or when explicitly
disabled by argument
+ if (!(autoDelete || (arguments != null &&
arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
+ {
+ boolean dlqArgumentPresent = arguments != null
+ &&
arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
+ if (dlqArgumentPresent)
+ {
+ boolean dlqEnabled = true;
+ if (dlqArgumentPresent)
+ {
+ Object argument =
arguments.get(Queue.CREATE_DLQ_ON_CREATION);
+ dlqEnabled = (argument instanceof Boolean &&
((Boolean)argument).booleanValue())
+ || (argument instanceof String &&
Boolean.parseBoolean(argument.toString()));
+ }
+ return dlqEnabled;
+ }
+ return virtualHostDefaultDeadLetterQueueEnabled;
+ }
+ return false;
+ }
+
+ private static String getDeadLetterQueueName(String name)
+ {
+ return name +
System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX,
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
+ private static String getDeadLetterExchangeName(String name)
+ {
+ return name +
System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX,
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+
}
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
Thu Apr 17 16:17:46 2014
@@ -31,7 +31,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedDependency;
@@ -41,13 +40,10 @@ public class QueueRecoverer extends Abst
{
private static final Logger _logger =
Logger.getLogger(QueueRecoverer.class);
private final VirtualHostImpl _virtualHost;
- private final QueueFactory _queueFactory;
- public QueueRecoverer(final VirtualHostImpl virtualHost,
- final QueueFactory queueFactory)
+ public QueueRecoverer(final VirtualHostImpl virtualHost)
{
_virtualHost = virtualHost;
- _queueFactory = queueFactory;
}
@Override
@@ -110,7 +106,7 @@ public class QueueRecoverer extends Abst
Map<String, Object> attributes = new LinkedHashMap<String,
Object>(_attributes);
attributes.put(Queue.ID, _id);
attributes.put(Queue.DURABLE, true);
- _queue = _queueFactory.restoreQueue(attributes);
+ _queue = _virtualHost.restoreQueue(attributes);
}
return _queue;
}
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
Thu Apr 17 16:17:46 2014
@@ -127,4 +127,7 @@ public interface VirtualHostImpl< X exte
TaskExecutor getTaskExecutor();
EventLogger getEventLogger();
+
+ // TODO - remove
+ public AMQQueue restoreQueue(Map<String, Object> attributes);
}
Modified:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
Thu Apr 17 16:17:46 2014
@@ -60,7 +60,6 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
@@ -83,7 +82,6 @@ public class DurableConfigurationRecover
private DurableConfigurationRecoverer _durableConfigurationRecoverer;
private VirtualHostImpl _vhost;
private DurableConfigurationStore _store;
- private QueueFactory _queueFactory;
private ConfiguredObjectFactory _configuredObjectFactory;
private ConfiguredObjectTypeFactory _exchangeFactory;
@@ -135,9 +133,7 @@ public class DurableConfigurationRecover
final ArgumentCaptor<Map> attributesArg =
ArgumentCaptor.forClass(Map.class);
- _queueFactory = mock(QueueFactory.class);
-
- when(_queueFactory.restoreQueue(attributesArg.capture())).then(
+ when(_vhost.restoreQueue(attributesArg.capture())).then(
new Answer()
{
@@ -184,7 +180,7 @@ public class DurableConfigurationRecover
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(_vhost, _queueFactory),
+ new QueueRecoverer(_vhost),
new ExchangeRecoverer(_vhost),
new BindingRecoverer(_vhost)
};
Modified:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
Thu Apr 17 16:17:46 2014
@@ -652,6 +652,12 @@ public class MockVirtualHost implements
}
@Override
+ public AMQQueue restoreQueue(final Map<String, Object> attributes)
+ {
+ return null;
+ }
+
+ @Override
public boolean getDefaultDeadLetterQueueEnabled()
{
return false;
Copied:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
(from r1588234,
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java?p2=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java&p1=qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java&r1=1588234&r2=1588301&rev=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
Thu Apr 17 16:17:46 2014
@@ -18,158 +18,65 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.virtualhost;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueueImpl;
+import org.apache.qpid.server.queue.StandardQueueImpl;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQQueueFactoryTest extends QpidTestCase
+public class VirtualHostQueueCreationTest extends QpidTestCase
{
- private QueueRegistry _queueRegistry;
private VirtualHostImpl _virtualHost;
- private AMQQueueFactory _queueFactory;
- private List<AMQQueue> _queues;
+ private Broker _broker;
@Override
public void setUp() throws Exception
{
super.setUp();
- _queues = new ArrayList<AMQQueue>();
-
- _virtualHost = mock(VirtualHostImpl.class);
-
when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
- when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
-
- DurableConfigurationStore store =
mock(DurableConfigurationStore.class);
- when(_virtualHost.getDurableConfigurationStore()).thenReturn(store);
-
- mockExchangeCreation();
- mockQueueRegistry();
- delegateVhostQueueCreation();
-
- when(_virtualHost.getQueues()).thenReturn(_queues);
-
-
- _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry);
+ _broker = BrokerTestHelper.createBrokerMock();
+ TaskExecutor taskExecutor = mock(TaskExecutor.class);
+ when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
+ _virtualHost = createHost();
+ _virtualHost.open();
- }
-
- private void delegateVhostQueueCreation() throws Exception
- {
-
- final ArgumentCaptor<Map> attributes =
ArgumentCaptor.forClass(Map.class);
- when(_virtualHost.createQueue(attributes.capture())).then(
- new Answer<AMQQueue>()
- {
- @Override
- public AMQQueue answer(InvocationOnMock invocation) throws
Throwable
- {
- return
_queueFactory.createQueue(attributes.getValue());
- }
- }
- );
}
-
- private void mockQueueRegistry()
+ private VirtualHostImpl createHost()
{
- _queueRegistry = mock(QueueRegistry.class);
-
- final ArgumentCaptor<AMQQueue> capturedQueue =
ArgumentCaptor.forClass(AMQQueue.class);
- doAnswer(new Answer()
- {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.TYPE, StandardVirtualHost.TYPE);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS,
Collections.singletonMap(MessageStore.STORE_TYPE,
+
TestMemoryMessageStore.TYPE));
- @Override
- public Object answer(final InvocationOnMock invocation) throws
Throwable
- {
- AMQQueue queue = capturedQueue.getValue();
-
when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue);
-
when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue);
-
when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue);
-
when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue);
- _queues.add(queue);
-
- return null;
- }
- }).when(_queueRegistry).registerQueue(capturedQueue.capture());
- }
-
- private void mockExchangeCreation() throws Exception
- {
- final ArgumentCaptor<Map> attributes =
ArgumentCaptor.forClass(Map.class);
-
-
- when(_virtualHost.createExchange(attributes.capture())).then(
- new Answer<ExchangeImpl>()
- {
- @Override
- public ExchangeImpl answer(InvocationOnMock invocation)
throws Throwable
- {
- Map attributeValues = attributes.getValue();
- final String name =
MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME,
attributeValues);
- final UUID id =
MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID,
attributeValues);
-
- final ExchangeImpl exchange = mock(ExchangeImpl.class);
- ExchangeType exType = mock(ExchangeType.class);
-
- when(exchange.getName()).thenReturn(name);
- when(exchange.getId()).thenReturn(id);
- when(exchange.getExchangeType()).thenReturn(exType);
-
- final String typeName =
MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE,
attributeValues);
- when(exType.getType()).thenReturn(typeName);
- when(exchange.getTypeName()).thenReturn(typeName);
-
-
when(_virtualHost.getExchange(eq(name))).thenReturn(exchange);
-
when(_virtualHost.getExchange(eq(id))).thenReturn(exchange);
-
- final ArgumentCaptor<AMQQueue> queue =
ArgumentCaptor.forClass(AMQQueue.class);
-
- when(exchange.addBinding(anyString(), queue.capture(),
anyMap())).then(new Answer<Boolean>()
- {
-
- @Override
- public Boolean answer(InvocationOnMock invocation)
throws Throwable
- {
-
when(exchange.isBound(eq(queue.getValue()))).thenReturn(true);
- return true;
- }
- });
-
- return exchange;
- }
- }
- );
+ attributes = new HashMap<String, Object>(attributes);
+ attributes.put(VirtualHost.ID, UUID.randomUUID());
+ return new StandardVirtualHost(attributes, _broker);
}
@Override
@@ -198,7 +105,7 @@ public class AMQQueueFactoryTest extends
attributes.put(PriorityQueue.PRIORITIES, 5);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Queue not a priority queue", PriorityQueueImpl.class,
queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -209,14 +116,14 @@ public class AMQQueueFactoryTest extends
public void testSimpleQueueRegistration() throws Exception
{
String queueName = getName();
- String dlQueueName = queueName +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Queue not a simple queue", StandardQueueImpl.class,
queue.getClass());
verifyQueueRegistered(queueName);
@@ -229,7 +136,7 @@ public class AMQQueueFactoryTest extends
}
/**
- * Tests that setting the {@link
QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
+ * Tests that setting the {@link
org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED}
argument true does
* cause the alternate exchange to be set and DLQ to be produced.
*/
public void testDeadLetterQueueEnabled() throws Exception
@@ -237,7 +144,7 @@ public class AMQQueueFactoryTest extends
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist",
_virtualHost.getExchange(dlExchangeName));
@@ -248,7 +155,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
ExchangeImpl altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is
enabled", altExchange);
@@ -277,7 +184,7 @@ public class AMQQueueFactoryTest extends
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist",
_virtualHost.getExchange(dlExchangeName));
@@ -288,7 +195,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Unexpected maximum delivery count", 5,
queue.getMaximumDeliveryAttempts());
ExchangeImpl altExchange = queue.getAlternateExchange();
@@ -310,7 +217,7 @@ public class AMQQueueFactoryTest extends
}
/**
- * Tests that setting the {@link
QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
+ * Tests that setting the {@link
org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED}
argument false does not
* result in the alternate exchange being set and DLQ being created.
*/
public void testDeadLetterQueueDisabled() throws Exception
@@ -320,7 +227,7 @@ public class AMQQueueFactoryTest extends
String queueName = "testDeadLetterQueueDisabled";
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist",
_virtualHost.getExchange(dlExchangeName));
@@ -329,7 +236,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.CREATE_DLQ_ON_CREATION, false);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertNull("Queue should not have an alternate exchange as DLQ is
disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist",
_virtualHost.getExchange(dlExchangeName));
@@ -341,7 +248,7 @@ public class AMQQueueFactoryTest extends
}
/**
- * Tests that setting the {@link
QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
+ * Tests that setting the {@link
org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED}
argument true but
* creating an auto-delete queue, does not result in the alternate exchange
* being set and DLQ being created.
*/
@@ -350,7 +257,7 @@ public class AMQQueueFactoryTest extends
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
String dlExchangeName = queueName +
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist",
_virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist",
_virtualHost.getExchange(dlExchangeName));
@@ -363,7 +270,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.LIFETIME_POLICY,
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
//create an autodelete queue
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Queue should be autodelete",
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
queue.getLifetimePolicy());
@@ -378,7 +285,7 @@ public class AMQQueueFactoryTest extends
}
/**
- * Tests that setting the {@link
QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * Tests that setting the {@link
org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT}
argument has
* the desired effect.
*/
public void testMaximumDeliveryCount() throws Exception
@@ -389,7 +296,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = _queueFactory.createQueue(attributes);
+ final AMQQueue queue = _virtualHost.createQueue(attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5,
queue.getMaximumDeliveryAttempts());
@@ -398,7 +305,7 @@ public class AMQQueueFactoryTest extends
}
/**
- * Tests that omitting the {@link
QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * Tests that omitting the {@link
org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT}
argument means
* that queue is created with a default maximumDeliveryCount of zero
(unless set in config).
*/
public void testMaximumDeliveryCountDefault() throws Exception
@@ -407,7 +314,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
- final AMQQueue queue = _queueFactory.createQueue(attributes);
+ final AMQQueue queue = _virtualHost.createQueue(attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0,
queue.getMaximumDeliveryAttempts());
@@ -425,7 +332,7 @@ public class AMQQueueFactoryTest extends
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
- _queueFactory.createQueue(attributes);
+ _virtualHost.createQueue(attributes);
fail("queue with null name can not be created!");
}
catch (Exception e)
@@ -454,7 +361,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
- _queueFactory.createQueue(attributes);
+ _virtualHost.createQueue(attributes);
fail("queue with DLQ name having more than 255 characters can not
be created!");
}
catch (Exception e)
@@ -484,7 +391,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
- _queueFactory.createQueue(attributes);
+ _virtualHost.createQueue(attributes);
fail("queue with DLE name having more than 255 characters can not
be created!");
}
catch (Exception e)
@@ -504,7 +411,7 @@ public class AMQQueueFactoryTest extends
attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey");
attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
assertEquals(Boolean.TRUE,
queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java?rev=1588301&r1=1588300&r2=1588301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
Thu Apr 17 16:17:46 2014
@@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -305,11 +305,11 @@ public class MaxDeliveryCountTest extend
AMQDestination checkQueueDLQ;
if(durableSub)
{
- checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" +
getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" +
getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
}
else
{
- checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
}
assertEquals("The DLQ should have " + expected + " msgs on it",
expected,
@@ -323,12 +323,12 @@ public class MaxDeliveryCountTest extend
MessageConsumer consumer;
if(durableSub)
{
- consumer =
clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ consumer =
clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
}
else
{
consumer = clientSession.createConsumer(
- clientSession.createQueue(destName +
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ clientSession.createQueue(destName +
AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
}
//keep track of the message we expect to still be on the DLQ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]