Implemented MaxConsumers DeleteOnNoConsumers for Queues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/541e4e0a Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/541e4e0a Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/541e4e0a Branch: refs/heads/ARTEMIS-780 Commit: 541e4e0a5178b61d1fba145e7d203824be4223e1 Parents: 47f47e8 Author: Martyn Taylor <[email protected]> Authored: Tue Nov 1 10:19:55 2016 +0000 Committer: Clebert Suconic <[email protected]> Committed: Mon Nov 7 11:28:07 2016 -0500 ---------------------------------------------------------------------- .../artemis/api/core/ActiveMQExceptionType.java | 8 +- .../ActiveMQQueueMaxConsumerLimitReached.java | 31 ++++++ .../core/ServerSessionPacketHandler.java | 8 ++ .../core/server/ActiveMQMessageBundle.java | 3 + .../artemis/core/server/ActiveMQServer.java | 26 ++++- .../activemq/artemis/core/server/Queue.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 50 +++++++--- .../core/server/impl/LastValueQueue.java | 4 +- .../server/impl/PostOfficeJournalLoader.java | 4 +- .../core/server/impl/QueueFactoryImpl.java | 6 +- .../artemis/core/server/impl/QueueImpl.java | 63 ++++++++++++ .../core/server/impl/ServerConsumerImpl.java | 1 + .../impl/ScheduledDeliveryHandlerTest.java | 10 ++ .../integration/addressing/AddressingTest.java | 100 ++++++++++++++++--- .../integration/client/HangConsumerTest.java | 8 +- .../unit/core/postoffice/impl/FakeQueue.java | 10 ++ 16 files changed, 290 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 752574a..0221562 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -213,7 +213,13 @@ public enum ActiveMQExceptionType { } }, - NOT_IMPLEMTNED_EXCEPTION(213); + NOT_IMPLEMTNED_EXCEPTION(213), + MAX_CONSUMER_LIMIT_EXCEEDED(214) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQQueueMaxConsumerLimitReached(msg); + } + }; private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java new file mode 100644 index 0000000..0577e08 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because a queue exists on the server. + */ +public final class ActiveMQQueueMaxConsumerLimitReached extends ActiveMQException { + + public ActiveMQQueueMaxConsumerLimitReached() { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED); + } + + public ActiveMQQueueMaxConsumerLimitReached(String msg) { + super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index b52534c..2a45f29 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -494,6 +495,13 @@ public class ServerSessionPacketHandler implements ChannelHandler { } else { ActiveMQServerLogger.LOGGER.caughtXaException(e); } + } catch (ActiveMQQueueMaxConsumerLimitReached e) { + if (requiresResponse) { + logger.debug("Sending exception to client", e); + response = new ActiveMQExceptionMessage(e); + } else { + ActiveMQServerLogger.LOGGER.caughtException(e); + } } catch (ActiveMQException e) { if (requiresResponse) { logger.debug("Sending exception to client", e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index f22873b..769d183 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionExcep import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; @@ -378,4 +379,6 @@ public interface ActiveMQMessageBundle { @Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.") ActiveMQIOErrorException diskBeyondLimit(); + @Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT) + ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index ed45645..749969a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -301,6 +301,14 @@ public interface ActiveMQServer extends ActiveMQComponent { Queue createQueue(SimpleString address, SimpleString queueName, + SimpleString filterString, + boolean durable, + boolean temporary, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + + Queue createQueue(SimpleString address, + SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, @@ -393,10 +401,22 @@ public interface ActiveMQServer extends ActiveMQComponent { AddressInfo getAddressInfo(SimpleString address); + Queue createQueue(SimpleString addressName, + SimpleString queueName, + SimpleString filterString, + SimpleString user, + boolean durable, + boolean temporary, + boolean ignoreIfExists, + boolean transientQueue, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + /* - * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will - * replace any factories with the same protocol - * */ + * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will + * replace any factories with the same protocol + * */ void addProtocolManagerFactory(ProtocolManagerFactory factory); /* http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 270e0cd..2b845d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -48,7 +48,7 @@ public interface Queue extends Bindable { boolean isDeleteOnNoConsumers(); - boolean getMaxConsumers(); + int getMaxConsumers(); void addConsumer(Consumer consumer) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 9421df3..ba63bb3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1433,6 +1433,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { public Queue createQueue(final SimpleString address, final SimpleString queueName, final SimpleString filterString, + final boolean durable, + final boolean temporary, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { + return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); + } + + @Override + public Queue createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, final SimpleString user, final boolean durable, final boolean temporary) throws Exception { @@ -2261,17 +2272,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { null); } - private Queue createQueue(final SimpleString addressName, - final SimpleString queueName, - final SimpleString filterString, - final SimpleString user, - final boolean durable, - final boolean temporary, - final boolean ignoreIfExists, - final boolean transientQueue, - final boolean autoCreated, - final Integer maxConsumers, - final Boolean deleteOnNoConsumers) throws Exception { + @Override + public Queue createQueue(final SimpleString addressName, + final SimpleString queueName, + final SimpleString filterString, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean ignoreIfExists, + final boolean transientQueue, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { @@ -2297,8 +2309,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { address = addressName; } + + AddressInfo defaultAddressInfo = new AddressInfo(address); // FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API. - AddressInfo info = postOffice.addAddressInfo(new AddressInfo(address)); + AddressInfo info = postOffice.addAddressInfo(defaultAddressInfo); + + boolean addressExists = true; + if (info == null) { + info = defaultAddressInfo; + addressExists = false; + } final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers; @@ -2323,10 +2343,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); if (queue.isDurable()) { - storageManager.addQueueBinding(txID, localQueueBinding); - if (info == null) { - storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress())); + if (!addressExists) { + storageManager.addAddressBinding(txID, getAddressInfo(address)); } + storageManager.addQueueBinding(txID, localQueueBinding); } try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 453f588..a4fa5dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -56,12 +56,14 @@ public class LastValueQueue extends QueueImpl { final boolean durable, final boolean temporary, final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, final Executor executor) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); new Exception("LastValueQeue " + this).toString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 9bd14f0..6f4cf03 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -148,8 +148,8 @@ public class PostOfficeJournalLoader implements JournalLoader { .durable(true) .temporary(false) .autoCreated(queueBindingInfo.isAutoCreated()) - .de - ); + .deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers()) + .maxConsumers(queueBindingInfo.getMaxConsumers()); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 3678553..bcc7c79 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory { final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } return queue; } @@ -101,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory { Queue queue; if (addressSettings.isLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } else { queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e01c81e..a37bb50 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; @@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.AddressManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; @@ -238,6 +240,14 @@ public class QueueImpl implements Queue { private SlowConsumerReaperRunnable slowConsumerReaperRunnable; + private int maxConsumers; + + private boolean deleteOnNoConsumers; + + private final AddressInfo addressInfo; + + private final AtomicInteger noConsumers = new AtomicInteger(0); + /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -334,10 +344,32 @@ public class QueueImpl implements Queue { final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, final Executor executor) { + this(id, address, name, filter, null, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + } + + public QueueImpl(final long id, + final SimpleString address, + final SimpleString name, + final Filter filter, + final PageSubscription pageSubscription, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers, + final ScheduledExecutorService scheduledExecutor, + final PostOffice postOffice, + final StorageManager storageManager, + final HierarchicalRepository<AddressSettings> addressSettingsRepository, + final Executor executor) { + this.id = id; this.address = address; + this.addressInfo = postOffice.getAddressInfo(address); + this.name = name; this.filter = filter; @@ -350,6 +382,10 @@ public class QueueImpl implements Queue { this.autoCreated = autoCreated; + this.maxConsumers = maxConsumers == null ? addressInfo.getDefaultMaxConsumers() : maxConsumers; + + this.deleteOnNoConsumers = deleteOnNoConsumers == null ? addressInfo.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; + this.postOffice = postOffice; this.storageManager = storageManager; @@ -437,6 +473,16 @@ public class QueueImpl implements Queue { } @Override + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + @Override + public int getMaxConsumers() { + return maxConsumers; + } + + @Override public SimpleString getName() { return name; } @@ -709,6 +755,11 @@ public class QueueImpl implements Queue { } synchronized (this) { + + if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) { + throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); + } + flushDeliveriesInTransit(); consumersChanged = true; @@ -722,6 +773,8 @@ public class QueueImpl implements Queue { if (refCountForConsumers != null) { refCountForConsumers.increment(); } + + noConsumers.incrementAndGet(); } } @@ -770,6 +823,15 @@ public class QueueImpl implements Queue { if (refCountForConsumers != null) { refCountForConsumers.decrement(); } + + if (noConsumers.decrementAndGet() == 0 && deleteOnNoConsumers) { + try { + deleteQueue(); + } + catch (Exception e) { + logger.error("Error deleting queue on no consumers. " + this.toString(), e); + } + } } } @@ -1361,6 +1423,7 @@ public class QueueImpl implements Queue { @Override public void deleteQueue(boolean removeConsumers) throws Exception { synchronized (this) { + if (this.queueDestroyed) return; this.queueDestroyed = true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 98a9c84..389b07e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -205,6 +205,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.creationTime = System.currentTimeMillis(); + if (browseOnly) { browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator()); } else { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 55a287a..11b11ab 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -901,6 +901,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public boolean isDeleteOnNoConsumers() { + return false; + } + + @Override + public int getMaxConsumers() { + return -1; + } + + @Override public void addConsumer(Consumer consumer) throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 03739e9..a21a62b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -19,8 +19,11 @@ package org.apache.activemq.artemis.tests.integration.addressing; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -187,8 +190,6 @@ public class AddressingTest extends ActiveMQTestBase { assertEquals(0, count); } - - @Test public void testMulticastRoutingBackwardsCompat() throws Exception { @@ -222,34 +223,103 @@ public class AddressingTest extends ActiveMQTestBase { } } - @Ignore @Test - public void testDeleteQueueOnNoConsumersTrue() { - fail("Not Implemented"); + public void testDeleteQueueOnNoConsumersTrue() throws Exception { + + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = true; + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + consumer1.close(); + + assertFalse(server.queueQuery(queueName).isExists()); } - @Ignore @Test - public void testDeleteQueueOnNoConsumersFalse() { - fail("Not Implemented"); + public void testDeleteQueueOnNoConsumersFalse() throws Exception { + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + consumer1.close(); + + assertTrue(server.queueQuery(queueName).isExists()); } - @Ignore @Test - public void testLimitOnMaxConsumers() { - fail("Not Implemented"); + public void testLimitOnMaxConsumers() throws Exception { + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers); + + Exception expectedException = null; + String expectedMessage = "Maximum Consumer Limit Reached on Queue"; + try { + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + } + catch (ActiveMQQueueMaxConsumerLimitReached e) { + expectedException = e; + } + + assertNotNull(expectedException); + assertTrue(expectedException.getMessage().contains(expectedMessage)); + assertTrue(expectedException.getMessage().contains(address)); + assertTrue(expectedException.getMessage().contains(queueName)); } @Ignore @Test - public void testUnlimitedMaxConsumers() { - fail("Not Implemented"); + public void testUnlimitedMaxConsumers() throws Exception { + int noConsumers = 50; + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + for (int i = 0; i < noConsumers; i++) { + session.createConsumer(q1.getName()); + } } @Ignore @Test - public void testDefaultMaxConsumersFromAddress() { - fail("Not Implemented"); + public void testDefaultMaxConsumersFromAddress() throws Exception { + int noConsumers = 50; + SimpleString address = new SimpleString("test.address"); + SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); + // For each address, create 2 Queues with the same address, assert both queues receive message + boolean deleteOnNoConsumers = false; + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.setDefaultMaxConsumers(0); + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + for (int i = 0; i < noConsumers; i++) { + session.createConsumer(q1.getName()); + } } @Ignore http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 2fd5915..124ece3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -224,12 +224,14 @@ public class HangConsumerTest extends ActiveMQTestBase { final boolean durable, final boolean temporary, final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository<AddressSettings> addressSettingsRepository, final Executor executor) { - super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); + super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor); } @Override @@ -256,7 +258,7 @@ public class HangConsumerTest extends ActiveMQTestBase { @Override public Queue createQueueWith(final QueueConfig config) { - queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); return queue; } @@ -271,7 +273,7 @@ public class HangConsumerTest extends ActiveMQTestBase { final boolean durable, final boolean temporary, final boolean autoCreated) { - queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); return queue; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 9a20d70..ef5c05e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -443,6 +443,16 @@ public class FakeQueue implements Queue { } @Override + public boolean isDeleteOnNoConsumers() { + return false; + } + + @Override + public int getMaxConsumers() { + return -1; + } + + @Override public LinkedListIterator<MessageReference> iterator() { // no-op return null;
