Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Fri Dec 11 15:50:19 2015 @@ -42,12 +42,13 @@ import org.apache.qpid.server.filter.Fil import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; @ManagedObject( category = false, type = ExchangeDefaults.TOPIC_EXCHANGE_CLASS ) public class TopicExchange extends AbstractExchange<TopicExchange> @@ -59,19 +60,19 @@ public class TopicExchange extends Abstr private final Map<String, TopicExchangeResult> _topicExchangeResults = new ConcurrentHashMap<String, TopicExchangeResult>(); - private final Map<BindingImpl, Map<String,Object>> _bindings = new HashMap<BindingImpl, Map<String,Object>>(); + private final Map<Binding<?>, Map<String,Object>> _bindings = new HashMap<>(); @ManagedObjectFactoryConstructor - public TopicExchange(final Map<String,Object> attributes, final VirtualHostImpl vhost) + public TopicExchange(final Map<String,Object> attributes, final VirtualHost<?> vhost) { super(attributes, vhost); } @Override - protected synchronized void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments) + protected synchronized void onBindingUpdated(final Binding<?> binding, final Map<String, Object> oldArguments) { final String bindingKey = binding.getBindingKey(); - AMQQueue queue = binding.getAMQQueue(); + Queue<?> queue = binding.getQueue(); Map<String,Object> args = binding.getArguments(); assert queue != null; @@ -129,10 +130,10 @@ public class TopicExchange extends Abstr } - protected synchronized void registerQueue(final BindingImpl binding) throws AMQInvalidArgumentException + protected synchronized void registerQueue(final Binding<?> binding) throws AMQInvalidArgumentException { final String bindingKey = binding.getBindingKey(); - AMQQueue queue = binding.getAMQQueue(); + Queue<?> queue = binding.getQueue(); Map<String,Object> args = binding.getArguments(); assert queue != null; @@ -225,7 +226,7 @@ public class TopicExchange extends Abstr ? "" : routingAddress; - final Collection<AMQQueue> matchedQueues = + final Collection<Queue<?>> matchedQueues = getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey); ArrayList<BaseQueue> queues; @@ -249,7 +250,7 @@ public class TopicExchange extends Abstr } - private synchronized boolean deregisterQueue(final BindingImpl binding) + private synchronized boolean deregisterQueue(final Binding<?> binding) { if(_bindings.containsKey(binding)) { @@ -266,8 +267,8 @@ public class TopicExchange extends Abstr { try { - result.removeFilteredQueue(binding.getAMQQueue(), FilterSupport.createMessageFilter(bindingArgs, - binding.getAMQQueue())); + result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs, + binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -276,7 +277,7 @@ public class TopicExchange extends Abstr } else { - result.removeUnfilteredQueue(binding.getAMQQueue()); + result.removeUnfilteredQueue(binding.getQueue()); } return true; } @@ -286,7 +287,7 @@ public class TopicExchange extends Abstr } } - private Collection<AMQQueue> getMatchedQueues(Filterable message, String routingKey) + private Collection<Queue<?>> getMatchedQueues(Filterable message, String routingKey) { Collection<TopicMatcherResult> results = _parser.parse(routingKey); @@ -299,12 +300,12 @@ public class TopicExchange extends Abstr results.toArray(resultQueues); return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null); default: - Collection<AMQQueue> queues = new HashSet<AMQQueue>(); + Collection<Queue<?>> queues = new HashSet<>(); for(TopicMatcherResult result : results) { TopicExchangeResult res = (TopicExchangeResult)result; - for(BindingImpl b : res.getBindings()) + for(Binding<?> b : res.getBindings()) { b.incrementMatches(); } @@ -317,7 +318,7 @@ public class TopicExchange extends Abstr } - protected void onBind(final BindingImpl binding) + protected void onBind(final Binding<?> binding) { try { @@ -330,7 +331,7 @@ public class TopicExchange extends Abstr } } - protected void onUnbind(final BindingImpl binding) + protected void onUnbind(final Binding<?> binding) { deregisterQueue(binding); }
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Fri Dec 11 15:50:19 2015 @@ -30,25 +30,25 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Queue; public final class TopicExchangeResult implements TopicMatcherResult { - private final List<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>(); - private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); - private final ConcurrentMap<AMQQueue, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>(); - private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0); + private final List<Binding<?>> _bindings = new CopyOnWriteArrayList<>(); + private final Map<Queue<?>, Integer> _unfilteredQueues = new ConcurrentHashMap<>(); + private final ConcurrentMap<Queue<?>, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>(); + private volatile ArrayList<Queue<?>> _unfilteredQueueList = new ArrayList<>(0); - public void addUnfilteredQueue(AMQQueue queue) + public void addUnfilteredQueue(Queue<?> queue) { Integer instances = _unfilteredQueues.get(queue); if(instances == null) { _unfilteredQueues.put(queue, 1); - ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList); + ArrayList<Queue<?>> newList = new ArrayList<>(_unfilteredQueueList); newList.add(queue); _unfilteredQueueList = newList; } @@ -58,13 +58,13 @@ public final class TopicExchangeResult i } } - public void removeUnfilteredQueue(AMQQueue queue) + public void removeUnfilteredQueue(Queue<?> queue) { Integer instances = _unfilteredQueues.get(queue); if(instances == 1) { _unfilteredQueues.remove(queue); - ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList); + ArrayList<Queue<?>> newList = new ArrayList<>(_unfilteredQueueList); newList.remove(queue); _unfilteredQueueList = newList; @@ -76,27 +76,27 @@ public final class TopicExchangeResult i } - public Collection<AMQQueue> getUnfilteredQueues() + public Collection<Queue<?>> getUnfilteredQueues() { return _unfilteredQueues.keySet(); } - public void addBinding(BindingImpl binding) + public void addBinding(Binding<?> binding) { _bindings.add(binding); } - public void removeBinding(BindingImpl binding) + public void removeBinding(Binding<?> binding) { _bindings.remove(binding); } - public List<BindingImpl> getBindings() + public List<Binding<?>> getBindings() { return new ArrayList<>(_bindings); } - public void addFilteredQueue(AMQQueue queue, FilterManager filter) + public void addFilteredQueue(Queue<?> queue, FilterManager filter) { Map<FilterManager,Integer> filters = _filteredQueues.get(queue); if(filters == null) @@ -116,7 +116,7 @@ public final class TopicExchangeResult i } - public void removeFilteredQueue(AMQQueue queue, FilterManager filter) + public void removeFilteredQueue(Queue<?> queue, FilterManager filter) { Map<FilterManager,Integer> filters = _filteredQueues.get(queue); if(filters != null) @@ -142,7 +142,7 @@ public final class TopicExchangeResult i } - public void replaceQueueFilter(AMQQueue queue, + public void replaceQueueFilter(Queue<?> queue, FilterManager oldFilter, FilterManager newFilter) { @@ -169,7 +169,7 @@ public final class TopicExchangeResult i _filteredQueues.put(queue,newFilters); } - public Collection<AMQQueue> processMessage(Filterable msg, Collection<AMQQueue> queues) + public Collection<Queue<?>> processMessage(Filterable msg, Collection<Queue<?>> queues) { if(queues == null) { @@ -179,18 +179,18 @@ public final class TopicExchangeResult i } else { - queues = new HashSet<AMQQueue>(); + queues = new HashSet<Queue<?>>(); } } else if(!(queues instanceof Set)) { - queues = new HashSet<AMQQueue>(queues); + queues = new HashSet<Queue<?>>(queues); } queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { - for(Map.Entry<AMQQueue, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet()) + for(Map.Entry<Queue<?>, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet()) { if(!queues.contains(entry.getKey())) { Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Fri Dec 11 15:50:19 2015 @@ -33,8 +33,8 @@ import org.apache.qpid.filter.selector.P import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport { @@ -95,7 +95,7 @@ public class FilterSupport && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; } - public static FilterManager createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException + public static FilterManager createMessageFilter(final Map<String,Object> args, Queue<?> queue) throws AMQInvalidArgumentException { FilterManager filterManager = null; if(argumentsContainNoLocal(args)) Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractVirtualHostLogger.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractVirtualHostLogger.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractVirtualHostLogger.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractVirtualHostLogger.java Fri Dec 11 15:50:19 2015 @@ -30,9 +30,9 @@ import org.apache.qpid.server.model.Virt public abstract class AbstractVirtualHostLogger <X extends AbstractVirtualHostLogger<X>> extends AbstractLogger<X> implements VirtualHostLogger<X> { - private final VirtualHost<?, ?, ?> _virtualHost; + private final VirtualHost<?> _virtualHost; - protected AbstractVirtualHostLogger(Map<String, Object> attributes, VirtualHost<?,?,?> virtualHost) + protected AbstractVirtualHostLogger(Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); _virtualHost = virtualHost; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostFileLoggerImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostFileLoggerImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostFileLoggerImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostFileLoggerImpl.java Fri Dec 11 15:50:19 2015 @@ -21,7 +21,6 @@ package org.apache.qpid.server.logging; import java.io.File; -import java.security.AccessControlException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -67,7 +66,7 @@ public class VirtualHostFileLoggerImpl e private boolean _safeMode; @ManagedObjectFactoryConstructor - protected VirtualHostFileLoggerImpl(final Map<String, Object> attributes, VirtualHost<?,?,?> virtualHost) + protected VirtualHostFileLoggerImpl(final Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostSyslogLoggerImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostSyslogLoggerImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostSyslogLoggerImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/VirtualHostSyslogLoggerImpl.java Fri Dec 11 15:50:19 2015 @@ -44,7 +44,7 @@ public class VirtualHostSyslogLoggerImpl private boolean _throwableExcluded; @ManagedObjectFactoryConstructor - protected VirtualHostSyslogLoggerImpl(final Map<String, Object> attributes, VirtualHost<?,?,?> virtualHost) + protected VirtualHostSyslogLoggerImpl(final Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java Fri Dec 11 15:50:19 2015 @@ -22,9 +22,9 @@ package org.apache.qpid.server.logging.s import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.BINDING_FORMAT; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; public class BindingLogSubject extends AbstractLogSubject { @@ -38,10 +38,10 @@ public class BindingLogSubject extends A * @param exchange * @param queue */ - public BindingLogSubject(String routingKey, ExchangeImpl exchange, - AMQQueue queue) + public BindingLogSubject(String routingKey, Exchange<?> exchange, + Queue<?> queue) { - VirtualHostImpl virtualHost = queue.getVirtualHost(); + VirtualHost<?> virtualHost = queue.getVirtualHost(); setLogStringWithFormat(BINDING_FORMAT, virtualHost.getName(), exchange.getType(), Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java Fri Dec 11 15:50:19 2015 @@ -22,14 +22,14 @@ package org.apache.qpid.server.logging.s import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.EXCHANGE_FORMAT; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.VirtualHost; public class ExchangeLogSubject extends AbstractLogSubject { /** Create an ExchangeLogSubject that Logs in the following format. */ - public ExchangeLogSubject(ExchangeImpl exchange, VirtualHostImpl vhost) + public ExchangeLogSubject(Exchange<?> exchange, VirtualHost<?> vhost) { setLogStringWithFormat(EXCHANGE_FORMAT, vhost.getName(), exchange.getType(), exchange.getName()); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java Fri Dec 11 15:50:19 2015 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.model.Queue; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; @@ -28,7 +28,7 @@ public class QueueLogSubject extends Abs { /** Create an QueueLogSubject that Logs in the following format. */ - public QueueLogSubject(AMQQueue queue) + public QueueLogSubject(Queue<?> queue) { setLogStringWithFormat(QUEUE_FORMAT, queue.getVirtualHost().getName(), Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java Fri Dec 11 15:50:19 2015 @@ -31,6 +31,8 @@ public interface Binding<X extends Bindi public String QUEUE = "queue"; public String EXCHANGE = "exchange"; + String getBindingKey(); + // TODO - this is a hack @DerivedAttribute Queue<?> getQueue(); @@ -45,4 +47,5 @@ public interface Binding<X extends Bindi @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Matches") long getMatches(); + void incrementMatches(); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Fri Dec 11 15:50:19 2015 @@ -177,7 +177,7 @@ public interface Broker<X extends Broker */ SecurityManager getSecurityManager(); - VirtualHost<?,?,?> findVirtualHostByName(String name); + VirtualHost<?> findVirtualHostByName(String name); VirtualHostNode findDefautVirtualHostNode(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Fri Dec 11 15:50:19 2015 @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.model; +import org.apache.qpid.server.consumer.ConsumerImpl; + @ManagedObject -public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X> +public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, ConsumerImpl { public String DISTRIBUTION_MODE = "distributionMode"; public String EXCLUSIVE = "exclusive"; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java Fri Dec 11 15:50:19 2015 @@ -25,10 +25,13 @@ import java.util.Map; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.qpid.server.exchange.ExchangeReferrer; +import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageDestination; @ManagedObject( description = Exchange.CLASS_DESCRIPTION ) -public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination +public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination, + ExchangeReferrer { String CLASS_DESCRIPTION = "<p>An Exchange is a named entity within the Virtualhost which receives messages from " + "producers and routes them to matching Queues within the Virtualhost.</p>" @@ -69,4 +72,70 @@ public interface Exchange<X extends Exch Map<String,Object> bindingArguments, Map<String, Object> attributes); + /** + * @return true if the exchange will be deleted after all queues have been detached + */ + boolean isAutoDelete(); + + boolean addBinding(String bindingKey, Queue<?> queue, Map<String, Object> arguments); + + boolean deleteBinding(String bindingKey, Queue<?> queue); + + boolean hasBinding(String bindingKey, Queue<?> queue); + + boolean replaceBinding(String bindingKey, + Queue<?> queue, + Map<String, Object> arguments); + + /** + * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments + * @param bindingKey + * @param arguments + * @param queue + * @return + */ + + boolean isBound(String bindingKey, Map<String, Object> arguments, Queue<?> queue); + + /** + * Determines whether a message would be isBound to a particular queue using a specific routing key + * @param bindingKey + * @param queue + * @return + */ + + boolean isBound(String bindingKey, Queue<?> queue); + + /** + * Determines whether a message is routing to any queue using a specific _routing key + * @param bindingKey + * @return + */ + boolean isBound(String bindingKey); + + /** + * Returns true if this exchange has at least one binding associated with it. + * @return + */ + boolean hasBindings(); + + boolean isBound(Queue<?> queue); + + boolean isBound(Map<String, Object> arguments); + + boolean isBound(String bindingKey, Map<String, Object> arguments); + + boolean isBound(Map<String, Object> arguments, Queue<?> queue); + + void removeReference(ExchangeReferrer exchange); + + void addReference(ExchangeReferrer exchange); + + boolean hasReferrers(); + + ListenableFuture<Void> removeBindingAsync(Binding<?> binding); + + EventLogger getEventLogger(); + + void addBinding(Binding<?> binding); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Dec 11 15:50:19 2015 @@ -23,15 +23,34 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.qpid.server.exchange.ExchangeReferrer; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInfo; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.CapacityChecker; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.NotificationCheck; +import org.apache.qpid.server.queue.QueueConsumer; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.util.Deletable; @ManagedObject( defaultType = "standard", description = Queue.CLASS_DESCRIPTION ) -public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> +public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>, + Comparable<X>, ExchangeReferrer, + BaseQueue, + MessageSource, + CapacityChecker, + MessageDestination, + Deletable<X> { String CLASS_DESCRIPTION = "<p>Queues are named entities within a VirtualHost that hold/buffer messages for later " + "delivery to consumer applications. Consumers subscribe to a queue in order to receive " @@ -174,10 +193,10 @@ public interface Queue<X extends Queue<X Map<String, Map<String,List<String>>> getDefaultFilters(); //children - Collection<? extends Binding> getBindings(); + Collection<? extends Binding<?>> getBindings(); - Collection<? extends Consumer> getConsumers(); + Collection<? extends Consumer<?>> getConsumers(); //operations @@ -259,4 +278,67 @@ public interface Queue<X extends Queue<X @ManagedOperation(nonModifying = true) MessageInfo getMessageInfoById(@Param(name = "messageId") long messageId); + boolean isExclusive(); + + void addBinding(Binding<?> binding); + + void removeBinding(Binding<?> binding); + + LogSubject getLogSubject(); + + VirtualHost<?> getVirtualHost(); + + boolean isUnused(); + + boolean isEmpty(); + + long getOldestMessageArrivalTime(); + + void requeue(QueueEntry entry); + + void dequeue(QueueEntry entry); + + void decrementUnackedMsgCount(QueueEntry queueEntry); + + void incrementUnackedMsgCount(QueueEntry entry); + + boolean resend(QueueEntry entry, QueueConsumer<?> consumer); + + List<? extends QueueEntry> getMessagesOnTheQueue(); + + List<Long> getMessagesOnTheQueue(int num); + + List<Long> getMessagesOnTheQueue(int num, int offset); + + QueueEntry getMessageOnTheQueue(long messageId); + + /** + * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. + * + * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. + * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. + * @param fromPosition + * @param toPosition + * @return + */ + List<? extends QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition); + + /** + * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc. + */ + void checkMessageStatus(); + + Set<NotificationCheck> getNotificationChecks(); + + void deliverAsync(); + + Collection<String> getAvailableAttributes(); + + void completeRecovery(); + + void recover(ServerMessage<?> message, MessageEnqueueRecord enqueueRecord); + + void setTargetSize(long targetSize); + + long getPotentialMemoryFootprint(); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Fri Dec 11 15:50:19 2015 @@ -26,18 +26,29 @@ import java.security.Principal; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; -import org.apache.qpid.server.message.MessageInstance; +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.server.logging.EventLoggerProvider; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; import org.apache.qpid.server.virtualhost.VirtualHostConnectionListener; -import javax.security.auth.Subject; - @ManagedObject( defaultType = "ProvidedStore", description = VirtualHost.CLASS_DESCRIPTION) -public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X> +public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<X>, StatisticsGatherer, + EventLoggerProvider { String CLASS_DESCRIPTION = "<p>A virtualhost is a namespace in which messaging is performed. Virtualhosts are " + "independent; the messaging goes on a within a virtualhost is independent of any " @@ -69,6 +80,7 @@ public interface VirtualHost<X extends V @ManagedContextDefault( name = "queue.deadLetterQueueEnabled") public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; + String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; @ManagedAttribute( defaultValue = "${queue.deadLetterQueueEnabled}") boolean isQueue_deadLetterQueueEnabled(); @@ -177,13 +189,13 @@ public interface VirtualHost<X extends V //children Collection<VirtualHostAlias> getAliases(); - Collection<Q> getQueues(); - Collection<E> getExchanges(); + Collection<Queue<?>> getQueues(); + Collection<Exchange<?>> getExchanges(); - E createExchange(Map<String, Object> attributes) + Exchange<?> createExchange(Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException; - Q createQueue(Map<String, Object> attributes) + Queue<?> createQueue(Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException; Collection<String> getExchangeTypeNames(); @@ -205,13 +217,45 @@ public interface VirtualHost<X extends V void registerConnection(AMQPConnection<?> connection); void deregisterConnection(AMQPConnection<?> connection); + Queue<?> getAttainedQueue(String name); + + Queue<?> getAttainedQueue(UUID id); + + MessageSource getAttainedMessageSource(String name); + + int removeQueue(Queue<?> queue); + + ListenableFuture<Integer> removeQueueAsync(Queue<?> queue); + + Exchange getAttainedExchange(String name); + + MessageDestination getAttainedMessageDestination(String name); + + MessageDestination getDefaultDestination(); + + DurableConfigurationStore getDurableConfigurationStore(); + + SecurityManager getSecurityManager(); + + void scheduleHouseKeepingTask(long period, HouseKeepingTask task); + + DtxRegistry getDtxRegistry(); + + LinkRegistry getLinkRegistry(String remoteContainerId); + + ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); + + boolean authoriseCreateConnection(AMQPConnection<?> connection); + + String getLocalAddress(String routingAddress); + interface Transaction { void dequeue(QueueEntry entry); - void copy(QueueEntry entry, Queue queue); + void copy(QueueEntry entry, Queue<?> queue); - void move(QueueEntry entry, Queue queue); + void move(QueueEntry entry, Queue<?> queue); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java Fri Dec 11 15:50:19 2015 @@ -46,7 +46,7 @@ public interface VirtualHostNode<X exten @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_BLUEPRINT_CONTEXT_VAR + "}") String getVirtualHostInitialConfiguration(); - VirtualHost<?,?,?> getVirtualHost(); + VirtualHost<?> getVirtualHost(); DurableConfigurationStore getConfigurationStore(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Fri Dec 11 15:50:19 2015 @@ -68,7 +68,6 @@ import org.apache.qpid.server.security.a import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.FileBasedSettings; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNodeCreator; import org.apache.qpid.util.SystemUtils; @@ -476,10 +475,10 @@ public class BrokerAdapter extends Abstr LOGGER.debug("Assigning target sizes based on total target {}", totalTarget); long totalSize = 0l; Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes(); - Map<VirtualHost<?, ?, ?>, Long> vhs = new HashMap<>(); + Map<VirtualHost<?>, Long> vhs = new HashMap<>(); for (VirtualHostNode<?> vhn : vhns) { - VirtualHost<?, ?, ?> vh = vhn.getVirtualHost(); + VirtualHost<?> vh = vhn.getVirtualHost(); if (vh != null) { long totalQueueDepthBytes = vh.getTotalQueueDepthBytes(); @@ -502,7 +501,7 @@ public class BrokerAdapter extends Abstr } final long proportionalShare = (long) ((double) totalTarget / (double) vhs.size()); - for (Map.Entry<VirtualHost<?, ?, ?>, Long> entry : vhs.entrySet()) + for (Map.Entry<VirtualHost<?>, Long> entry : vhs.entrySet()) { long virtualHostTotalQueueSize = entry.getValue(); final long size; @@ -664,11 +663,11 @@ public class BrokerAdapter extends Abstr } @Override - public VirtualHost<?,?,?> findVirtualHostByName(String name) + public VirtualHost<?> findVirtualHostByName(String name) { for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class)) { - VirtualHost<?, ?, ?> virtualHost = virtualHostNode.getVirtualHost(); + VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost(); if (virtualHost != null && virtualHost.getName().equals(name)) { return virtualHost; @@ -779,10 +778,10 @@ public class BrokerAdapter extends Abstr for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class)) { - VirtualHost<?, ?, ?> virtualHost = virtualHostNode.getVirtualHost(); - if (virtualHost instanceof VirtualHostImpl) + VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost(); + if (virtualHost != null) { - ((VirtualHostImpl) virtualHost).resetStatistics(); + virtualHost.resetStatistics(); } } } @@ -829,16 +828,15 @@ public class BrokerAdapter extends Abstr for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class)) { - VirtualHost<?, ?, ?> virtualHost = virtualHostNode.getVirtualHost(); - if (virtualHost instanceof VirtualHostImpl) + VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost(); + if (virtualHost != null) { - VirtualHostImpl vhostImpl = (VirtualHostImpl) virtualHost; String name = virtualHost.getName(); - StatisticsCounter dataDelivered = vhostImpl.getDataDeliveryStatistics(); - StatisticsCounter messagesDelivered = vhostImpl.getMessageDeliveryStatistics(); - StatisticsCounter dataReceived = vhostImpl.getDataReceiptStatistics(); - StatisticsCounter messagesReceived = vhostImpl.getMessageReceiptStatistics(); - EventLogger logger = vhostImpl.getEventLogger(); + StatisticsCounter dataDelivered = virtualHost.getDataDeliveryStatistics(); + StatisticsCounter messagesDelivered = virtualHost.getMessageDeliveryStatistics(); + StatisticsCounter dataReceived = virtualHost.getDataReceiptStatistics(); + StatisticsCounter messagesReceived = virtualHost.getMessageReceiptStatistics(); + EventLogger logger = virtualHost.getEventLogger(); logger.message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Fri Dec 11 15:50:19 2015 @@ -34,8 +34,8 @@ import org.apache.qpid.server.model.Stat import org.apache.qpid.server.model.StatisticUnit; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import javax.net.ssl.SSLContext; @@ -138,7 +138,7 @@ public interface AmqpPort<X extends Amqp @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Connections") int getConnectionCount(); - VirtualHostImpl getVirtualHost(String name); + VirtualHost<?> getVirtualHost(String name); boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Fri Dec 11 15:50:19 2015 @@ -62,6 +62,7 @@ import org.apache.qpid.server.model.Prot import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.model.VirtualHostNameAlias; import org.apache.qpid.server.model.VirtualHostNode; @@ -72,7 +73,6 @@ import org.apache.qpid.server.transport. import org.apache.qpid.server.transport.TransportProvider; import org.apache.qpid.server.util.PortUtil; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider<AmqpPortImpl> implements AmqpPort<AmqpPortImpl> @@ -201,7 +201,7 @@ public class AmqpPortImpl extends Abstra } @Override - public VirtualHostImpl getVirtualHost(String name) + public VirtualHost<?> getVirtualHost(String name) { Collection<VirtualHostAlias> aliases = new TreeSet<>(VIRTUAL_HOST_ALIAS_COMPARATOR); @@ -212,7 +212,7 @@ public class AmqpPortImpl extends Abstra VirtualHostNode vhn = alias.getVirtualHostNode(name); if (vhn != null) { - return (VirtualHostImpl) vhn.getVirtualHost(); + return vhn.getVirtualHost(); } } return null; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java Fri Dec 11 15:50:19 2015 @@ -25,5 +25,5 @@ import org.apache.qpid.server.transport. public interface ConnectionValidator extends Pluggable { - boolean validateConnectionCreation(AMQPConnection<?> connection, final VirtualHost<?, ?, ?> virtualHost); + boolean validateConnectionCreation(AMQPConnection<?> connection, final VirtualHost<?> virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java Fri Dec 11 15:50:19 2015 @@ -21,12 +21,12 @@ package org.apache.qpid.server.plugin; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; public interface MessageConverter<M extends ServerMessage, N extends ServerMessage> extends Pluggable { Class<M> getInputClass(); Class<N> getOutputClass(); - N convert(M message, VirtualHostImpl vhost); + N convert(M message, VirtualHost<?> vhost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java Fri Dec 11 15:50:19 2015 @@ -21,7 +21,7 @@ package org.apache.qpid.server.plugin; import org.apache.qpid.server.message.MessageNode; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; public interface SystemNodeCreator extends Pluggable { @@ -32,7 +32,7 @@ public interface SystemNodeCreator exten void removeSystemNode(String name); - VirtualHostImpl getVirtualHost(); + VirtualHost<?> getVirtualHost(); boolean hasSystemNode(String name); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Dec 11 15:50:19 2015 @@ -27,8 +27,8 @@ import java.util.concurrent.ConcurrentSk import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Session; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.util.Deletable; import org.apache.qpid.transport.network.Ticker; @@ -36,7 +36,7 @@ import org.apache.qpid.transport.network /** * Session model interface. * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} - * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}. + * when monitoring the blocking and blocking of queues/sessions in {@link Queue}. */ public interface AMQSessionModel<T extends AMQSessionModel<T>> extends Comparable<AMQSessionModel>, Deletable<T> { @@ -69,9 +69,9 @@ public interface AMQSessionModel<T exten */ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose); - void block(AMQQueue queue); + void block(Queue<?> queue); - void unblock(AMQQueue queue); + void unblock(Queue<?> queue); void block(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Dec 11 15:50:19 2015 @@ -54,19 +54,16 @@ import com.google.common.util.concurrent import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.message.MessageInfo; import org.apache.qpid.server.message.MessageInfoImpl; -import org.apache.qpid.server.model.CustomRestHeaders; -import org.apache.qpid.server.model.RestContentHeader; +import org.apache.qpid.server.model.*; import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; @@ -79,19 +76,6 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.AbstractConfiguredObject; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.ConfigurationChangeListener; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.ManagedAttributeField; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.QueueNotificationListener; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.StateTransition; -import org.apache.qpid.server.model.Content; import org.apache.qpid.server.plugin.MessageFilterFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -112,13 +96,12 @@ import org.apache.qpid.server.util.Delet import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.transport.TransportException; public abstract class AbstractQueue<X extends AbstractQueue<X>> extends AbstractConfiguredObject<X> - implements AMQQueue<X>, + implements Queue<X>, StateChangeListener<QueueConsumer<?>, State>, MessageGroupManager.ConsumerResetHelper { @@ -140,7 +123,7 @@ public abstract class AbstractQueue<X ex private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l; - private final VirtualHostImpl _virtualHost; + private final VirtualHost<?> _virtualHost; private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); private final AccessControlContext _immediateDeliveryContext; @@ -231,8 +214,8 @@ public abstract class AbstractQueue<X ex private final AtomicBoolean _deleted = new AtomicBoolean(false); private final SettableFuture<Integer> _deleteFuture = SettableFuture.create(); - private final List<Action<? super AMQQueue>> _deleteTaskList = - new CopyOnWriteArrayList<Action<? super AMQQueue>>(); + private final List<Action<? super X>> _deleteTaskList = + new CopyOnWriteArrayList<>(); private LogSubject _logSubject; @@ -243,7 +226,7 @@ public abstract class AbstractQueue<X ex private final AtomicBoolean _overfull = new AtomicBoolean(false); private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker(); private final long _estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD); - private final CopyOnWriteArrayList<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>(); + private final CopyOnWriteArrayList<Binding<?>> _bindings = new CopyOnWriteArrayList<>(); private Map<String, Object> _arguments; /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ @@ -283,7 +266,7 @@ public abstract class AbstractQueue<X ex private boolean _closing; private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>(); - protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) + protected AbstractQueue(Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(parentsMap(virtualHost), attributes); @@ -593,24 +576,26 @@ public abstract class AbstractQueue<X ex return _alternateExchange; } - public void setAlternateExchange(ExchangeImpl exchange) + public void setAlternateExchange(Exchange<?> exchange) { _alternateExchange = exchange; } + @SuppressWarnings("unused") private void postSetAlternateExchange() { - if(_alternateExchange instanceof ExchangeImpl) + if(_alternateExchange != null) { - ((ExchangeImpl)_alternateExchange).addReference(this); + _alternateExchange.addReference(this); } } + @SuppressWarnings("unused") private void preSetAlternateExchange() { - if(_alternateExchange instanceof ExchangeImpl) + if(_alternateExchange != null) { - ((ExchangeImpl)_alternateExchange).removeReference(this); + _alternateExchange.removeReference(this); } } @@ -667,7 +652,7 @@ public abstract class AbstractQueue<X ex return null; } - public VirtualHostImpl getVirtualHost() + public VirtualHost<?> getVirtualHost() { return _virtualHost; } @@ -983,6 +968,7 @@ public abstract class AbstractQueue<X ex } + @Override public Collection<QueueConsumer<?>> getConsumers() { List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>(); @@ -1006,6 +992,7 @@ public abstract class AbstractQueue<X ex } } + @Override public void resetSubPointersForGroups(final QueueEntry entry) { QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); @@ -1024,7 +1011,7 @@ public abstract class AbstractQueue<X ex deliverAsync(); } - public void addBinding(final BindingImpl binding) + public void addBinding(final Binding<?> binding) { _bindings.add(binding); int bindingCount = _bindings.size(); @@ -1039,13 +1026,13 @@ public abstract class AbstractQueue<X ex childAdded(binding); } - public void removeBinding(final BindingImpl binding) + public void removeBinding(final Binding<?> binding) { _bindings.remove(binding); childRemoved(binding); } - public Collection<BindingImpl> getBindings() + public Collection<Binding<?>> getBindings() { return Collections.unmodifiableList(_bindings); } @@ -1665,7 +1652,7 @@ public abstract class AbstractQueue<X ex } } - public int compareTo(final AMQQueue o) + public int compareTo(final X o) { return getName().compareTo(o.getName()); } @@ -1910,13 +1897,13 @@ public abstract class AbstractQueue<X ex } @Override - public void addDeleteTask(final Action<? super AMQQueue> task) + public void addDeleteTask(final Action<? super X> task) { _deleteTaskList.add(task); } @Override - public void removeDeleteTask(final Action<? super AMQQueue> task) + public void removeDeleteTask(final Action<? super X> task) { _deleteTaskList.remove(task); } @@ -1930,10 +1917,10 @@ public abstract class AbstractQueue<X ex { final int queueDepthMessages = getQueueDepthMessages(); final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>(_bindings.size()); - final ArrayList<BindingImpl> bindingCopy = new ArrayList<>(_bindings); + final ArrayList<Binding<?>> bindingCopy = new ArrayList<>(_bindings); // TODO - RG - Need to sort out bindings! - for (BindingImpl b : bindingCopy) + for (Binding<?> b : bindingCopy) { removeBindingFutures.add(b.deleteAsync()); } @@ -2009,9 +1996,9 @@ public abstract class AbstractQueue<X ex private void performQueueDeleteTasks() { - for (Action<? super AMQQueue> task : _deleteTaskList) + for (Action<? super X> task : _deleteTaskList) { - task.performAction(this); + task.performAction((X)this); } _deleteTaskList.clear(); @@ -3132,8 +3119,8 @@ public abstract class AbstractQueue<X ex if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange) { final String bindingKey = (String) attributes.get("name"); - ((ExchangeImpl)otherParents[0]).addBinding(bindingKey, this, - (Map<String,Object>) attributes.get(Binding.ARGUMENTS)); + ((Exchange<?>)otherParents[0]).addBinding(bindingKey, this, + (Map<String,Object>) attributes.get(Binding.ARGUMENTS)); for(Binding binding : _bindings) { if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey)) Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java Fri Dec 11 15:50:19 2015 @@ -23,9 +23,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Queue; @ManagedObject( category = false, type= LastValueQueue.LAST_VALUE_QUEUE_TYPE) -public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X> +public interface LastValueQueue<X extends LastValueQueue<X>> extends Queue<X> { String LVQ_KEY = "lvqKey"; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java Fri Dec 11 15:50:19 2015 @@ -25,7 +25,7 @@ import java.util.Map; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implements LastValueQueue<LastValueQueueImpl> { @@ -35,7 +35,7 @@ public class LastValueQueueImpl extends private String _lvqKey; @ManagedObjectFactoryConstructor - public LastValueQueueImpl(Map<String, Object> attributes, VirtualHostImpl virtualHost) + public LastValueQueueImpl(Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Fri Dec 11 15:50:19 2015 @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; @@ -32,7 +33,7 @@ public enum NotificationCheck MESSAGE_COUNT_ALERT { - public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener) + public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, QueueNotificationListener listener) { int msgCount; final long maximumMessageCount = queue.getAlertThresholdQueueDepthMessages(); @@ -49,7 +50,7 @@ public enum NotificationCheck }, MESSAGE_SIZE_ALERT(true) { - public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener) + public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, QueueNotificationListener listener) { final long maximumMessageSize = queue.getAlertThresholdMessageSize(); if(maximumMessageSize != 0) @@ -73,7 +74,7 @@ public enum NotificationCheck }, QUEUE_DEPTH_ALERT { - public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener) + public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, QueueNotificationListener listener) { // Check for threshold queue depth in bytes final long maximumQueueDepth = queue.getAlertThresholdQueueDepthBytes(); @@ -97,7 +98,7 @@ public enum NotificationCheck }, MESSAGE_AGE_ALERT { - public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener) + public boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, QueueNotificationListener listener) { final long maxMessageAge = queue.getAlertThresholdMessageAge(); @@ -144,10 +145,10 @@ public enum NotificationCheck return _messageSpecific; } - public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener); + public abstract boolean notifyIfNecessary(ServerMessage<?> msg, Queue<?> queue, QueueNotificationListener listener); //A bit of a hack, only for use until we do the logging listener - private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg) + private static void logNotification(NotificationCheck notification, Queue<?> queue, String notificationMsg) { LOGGER.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Fri Dec 11 15:50:19 2015 @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.MessageEnqueueRecord; public abstract class OrderedQueueEntryList implements QueueEntryList @@ -40,7 +41,7 @@ public abstract class OrderedQueueEntryL (OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail"); - private final AMQQueue _queue; + private final Queue<?> _queue; static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry> _nextUpdater = OrderedQueueEntry._nextUpdater; @@ -50,7 +51,7 @@ public abstract class OrderedQueueEntryL private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>(); - public OrderedQueueEntryList(AMQQueue queue, HeadCreator headCreator) + public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator) { _queue = queue; _head = headCreator.createHead(this); @@ -72,7 +73,7 @@ public abstract class OrderedQueueEntryL } - public AMQQueue getQueue() + public Queue<?> getQueue() { return _queue; } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Fri Dec 11 15:50:19 2015 @@ -22,12 +22,12 @@ package org.apache.qpid.server.queue; import java.util.Map; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; public abstract class OutOfOrderQueue<X extends OutOfOrderQueue<X>> extends AbstractQueue<X> { - protected OutOfOrderQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) + protected OutOfOrderQueue(Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java Fri Dec 11 15:50:19 2015 @@ -23,9 +23,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Queue; @ManagedObject( category = false, type="priority" ) -public interface PriorityQueue<X extends PriorityQueue<X>> extends AMQQueue<X> +public interface PriorityQueue<X extends PriorityQueue<X>> extends Queue<X> { String PRIORITIES = "priorities"; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java Fri Dec 11 15:50:19 2015 @@ -27,7 +27,7 @@ import org.apache.qpid.server.logging.me import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implements PriorityQueue<PriorityQueueImpl> { @@ -38,7 +38,7 @@ public class PriorityQueueImpl extends O private int _priorities; @ManagedObjectFactoryConstructor - public PriorityQueueImpl(Map<String, Object> attributes, VirtualHostImpl virtualHost) + public PriorityQueueImpl(Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Dec 11 15:50:19 2015 @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Queue; -public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, Consumer<X> +public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer<X> { void flushBatched(); @@ -43,7 +43,7 @@ public interface QueueConsumer<X extends void queueDeleted(); - AMQQueue getQueue(); + Queue<?> getQueue(); boolean resend(QueueEntry e); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Dec 11 15:50:19 2015 @@ -54,6 +54,7 @@ import org.apache.qpid.server.model.Abst import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; @@ -331,7 +332,8 @@ class QueueConsumerImpl return STATE_MAP.get(_target.getState()); } - public final AMQQueue getQueue() + @Override + public final Queue<?> getQueue() { return _queue; } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Dec 11 15:50:19 2015 @@ -22,11 +22,12 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.model.Queue; public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> { - AMQQueue getQueue(); + Queue<?> getQueue(); long getSize(); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Dec 11 15:50:19 2015 @@ -38,6 +38,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.LocalTransaction; @@ -161,7 +162,7 @@ public abstract class QueueEntryImpl imp return _entryId; } - public AMQQueue getQueue() + public Queue<?> getQueue() { return _queueEntryList.getQueue(); } @@ -527,7 +528,7 @@ public abstract class QueueEntryImpl imp public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn) { - final AMQQueue currentQueue = getQueue(); + final Queue<?> currentQueue = getQueue(); Exchange<?> alternateExchange = currentQueue.getAlternateExchange(); boolean autocommit = txn == null; int enqueues; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Dec 11 15:50:19 2015 @@ -21,11 +21,12 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.MessageEnqueueRecord; public interface QueueEntryList { - AMQQueue<?> getQueue(); + Queue<?> getQueue(); QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Fri Dec 11 15:50:19 2015 @@ -22,9 +22,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Queue; @ManagedObject( category = false, type= SortedQueue.SORTED_QUEUE_TYPE) -public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X> +public interface SortedQueue<X extends SortedQueue<X>> extends Queue<X> { String SORT_KEY = "sortKey"; String SORTED_QUEUE_TYPE = "sorted"; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java Fri Dec 11 15:50:19 2015 @@ -25,9 +25,9 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements SortedQueue<SortedQueueImpl> { @@ -41,7 +41,7 @@ public class SortedQueueImpl extends Out private SortedQueueEntryList _entries; @ManagedObjectFactoryConstructor - public SortedQueueImpl(Map<String, Object> attributes, VirtualHostImpl virtualHost) + public SortedQueueImpl(Map<String, Object> attributes, VirtualHost<?> virtualHost) { super(attributes, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java Fri Dec 11 15:50:19 2015 @@ -21,8 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Queue; @ManagedObject( category = false, type="standard" ) -public interface StandardQueue<X extends StandardQueue<X>> extends AMQQueue<X> +public interface StandardQueue<X extends StandardQueue<X>> extends Queue<X> { } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java Fri Dec 11 15:50:19 2015 @@ -23,14 +23,14 @@ package org.apache.qpid.server.queue; import java.util.Map; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; public class StandardQueueImpl extends AbstractQueue<StandardQueueImpl> implements StandardQueue<StandardQueueImpl> { private StandardQueueEntryList _entries; @ManagedObjectFactoryConstructor - public StandardQueueImpl(final Map<String, Object> arguments, final VirtualHostImpl virtualHost) + public StandardQueueImpl(final Map<String, Object> arguments, final VirtualHost<?> virtualHost) { super(arguments, virtualHost); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Fri Dec 11 15:50:19 2015 @@ -55,10 +55,10 @@ public class TrustStoreMessageSource ext private final TrustStore<?> _trustStore; private final AtomicReference<Set<Certificate>> _certCache = new AtomicReference<>(); - private final VirtualHost<?, ?, ?> _virtualHost; + private final VirtualHost<?> _virtualHost; - public TrustStoreMessageSource(final TrustStore<?> trustStore, final VirtualHost<?, ?, ?> virtualHost) + public TrustStoreMessageSource(final TrustStore<?> trustStore, final VirtualHost<?> virtualHost) { super(getSourceNameFromTrustStore(trustStore), virtualHost); _virtualHost = virtualHost; Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java Fri Dec 11 15:50:19 2015 @@ -31,7 +31,6 @@ import org.apache.qpid.server.model.Virt import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.SystemNodeCreator; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; @PluggableService public class TrustStoreMessageSourceCreator implements SystemNodeCreator @@ -46,7 +45,7 @@ public class TrustStoreMessageSourceCrea @Override public void register(final SystemNodeRegistry registry) { - final VirtualHostImpl<?,?,?> vhost = registry.getVirtualHost(); + final VirtualHost<?> vhost = registry.getVirtualHost(); VirtualHostNode<?> virtualHostNode = vhost.getParent(VirtualHostNode.class); final Broker<?> broker = virtualHostNode.getParent(Broker.class); @@ -115,7 +114,7 @@ public class TrustStoreMessageSourceCrea } - private boolean isTrustStoreExposedAsMessageSource(VirtualHost<?,?,?> virtualHost, final TrustStore trustStore) + private boolean isTrustStoreExposedAsMessageSource(VirtualHost<?> virtualHost, final TrustStore trustStore) { return trustStore.getState() == State.ACTIVE && trustStore.isExposedAsMessageSource() && (trustStore.getIncludedVirtualHostMessageSources().contains(virtualHost) Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1719463&r1=1719462&r2=1719463&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Dec 11 15:50:19 2015 @@ -64,7 +64,6 @@ import org.apache.qpid.server.model.port import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.network.AggregateTicker; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; @@ -346,14 +345,14 @@ public abstract class AbstractAMQPConnec { _messagesDelivered.registerEvent(1L); _dataDelivered.registerEvent(messageSize); - ((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageDelivered(messageSize); + ((VirtualHost<?>)getVirtualHost()).registerMessageDelivered(messageSize); } public void registerMessageReceived(long messageSize, long timestamp) { _messagesReceived.registerEvent(1L, timestamp); _dataReceived.registerEvent(messageSize, timestamp); - ((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageReceived(messageSize, timestamp); + ((VirtualHost<?>)getVirtualHost()).registerMessageReceived(messageSize, timestamp); } public final void resetStatistics() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
