Add Queue Meta Data for DeleteOnNoConsumers,MaxConsumers
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47f47e85 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47f47e85 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47f47e85 Branch: refs/heads/ARTEMIS-780 Commit: 47f47e85b870382effd3f4a47234b94756b094b7 Parents: d70bf3b Author: Martyn Taylor <[email protected]> Authored: Mon Oct 31 13:19:02 2016 +0000 Committer: Clebert Suconic <[email protected]> Committed: Mon Nov 7 11:28:07 2016 -0500 ---------------------------------------------------------------------- .../impl/ActiveMQServerControlImpl.java | 1 - .../core/persistence/QueueBindingInfo.java | 7 ++ .../codec/PersistentQueueBindingEncoding.java | 43 ++++++++++- .../artemis/core/server/ActiveMQServer.java | 9 +++ .../activemq/artemis/core/server/Queue.java | 4 ++ .../artemis/core/server/QueueConfig.java | 51 ++++++++++++- .../core/server/impl/ActiveMQServerImpl.java | 76 +++++++++++++++++--- .../artemis/core/server/impl/AddressInfo.java | 5 +- .../server/impl/PostOfficeJournalLoader.java | 8 ++- 9 files changed, 185 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index b0e8b9b..fcbf15c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -573,7 +573,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); clearIO(); try { - server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 8c80a8a..4d435c6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -46,4 +46,11 @@ public interface QueueBindingInfo { List<QueueStatusEncoding> getQueueStatusEncodings(); + int getMaxConsumers(); + + void setMaxConsumers(int maxConsumers); + + boolean isDeleteOnNoConsumers(); + + void setDeleteOnNoConsumers(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 039460c..78a81ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -41,6 +41,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public List<QueueStatusEncoding> queueStatusEncodings; + public int maxConsumers; + + public boolean deleteOnNoConsumers; + public PersistentQueueBindingEncoding() { } @@ -57,6 +61,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin user + ", autoCreated=" + autoCreated + + ", maxConsumers=" + + maxConsumers + + ", deleteOnNoConsumers=" + + deleteOnNoConsumers + "]"; } @@ -125,6 +133,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } @Override + public int getMaxConsumers() { + return 0; + } + + @Override + public void setMaxConsumers(int maxConsumers) { + + } + + @Override + public boolean isDeleteOnNoConsumers() { + return false; + } + + @Override + public void setDeleteOnNoConsumers() { + + } + + @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); address = buffer.readSimpleString(); @@ -144,6 +172,15 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } autoCreated = buffer.readBoolean(); + + if (buffer.readableBytes() > 0) { + maxConsumers = buffer.readInt(); + deleteOnNoConsumers = buffer.readBoolean(); + } + else { + maxConsumers = -1; + deleteOnNoConsumers = false; + } } @Override @@ -153,13 +190,17 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin buffer.writeNullableSimpleString(filterString); buffer.writeNullableSimpleString(createMetadata()); buffer.writeBoolean(autoCreated); + buffer.writeInt(maxConsumers); + buffer.writeBoolean(deleteOnNoConsumers); } @Override public int getEncodeSize() { return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) + SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN + - SimpleString.sizeofNullableString(createMetadata()); + SimpleString.sizeofNullableString(createMetadata()) + + DataConstants.SIZE_INT + + DataConstants.SIZE_BOOLEAN; } private SimpleString createMetadata() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index a6256d8..ed45645 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -333,6 +333,15 @@ public interface ActiveMQServer extends ActiveMQComponent { QueueQueryResult queueQuery(SimpleString name) throws Exception; + Queue deployQueue(SimpleString address, + SimpleString resourceName, + SimpleString filterString, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + void destroyQueue(SimpleString queueName) throws Exception; void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 52cd2f0..270e0cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -46,6 +46,10 @@ public interface Queue extends Bindable { boolean isAutoCreated(); + boolean isDeleteOnNoConsumers(); + + boolean getMaxConsumers(); + void addConsumer(Consumer consumer) throws Exception; void removeConsumer(Consumer consumer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index f750f6c..3b7ed71 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -33,6 +33,8 @@ public final class QueueConfig { private final boolean durable; private final boolean temporary; private final boolean autoCreated; + private final int maxConsumers; + private final boolean deleteOnNoConsumers; public static final class Builder { @@ -45,6 +47,8 @@ public final class QueueConfig { private boolean durable; private boolean temporary; private boolean autoCreated; + private int maxConsumers; + private boolean deleteOnNoConsumers; private Builder(final long id, final SimpleString name) { this(id, name, name); @@ -60,6 +64,8 @@ public final class QueueConfig { this.durable = true; this.temporary = false; this.autoCreated = true; + this.maxConsumers = -1; + this.deleteOnNoConsumers = false; validateState(); } @@ -106,6 +112,16 @@ public final class QueueConfig { return this; } + public Builder maxConsumers(final int maxConsumers) { + this.maxConsumers = maxConsumers; + return this; + } + + public Builder deleteOnNoConsumers(final boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + return this; + } + /** * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}. * <br> @@ -127,7 +143,7 @@ public final class QueueConfig { } else { pageSubscription = null; } - return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated); + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers); } } @@ -168,7 +184,9 @@ public final class QueueConfig { final SimpleString user, final boolean durable, final boolean temporary, - final boolean autoCreated) { + final boolean autoCreated, + final int maxConsumers, + final boolean deleteOnNoConsumers) { this.id = id; this.address = address; this.name = name; @@ -178,6 +196,8 @@ public final class QueueConfig { this.durable = durable; this.temporary = temporary; this.autoCreated = autoCreated; + this.deleteOnNoConsumers = deleteOnNoConsumers; + this.maxConsumers = maxConsumers; } public long id() { @@ -216,6 +236,14 @@ public final class QueueConfig { return autoCreated; } + public boolean isDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public int maxConsumers() { + return maxConsumers; + } + @Override public boolean equals(Object o) { if (this == o) @@ -241,6 +269,10 @@ public final class QueueConfig { return false; if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null) return false; + if (maxConsumers != that.maxConsumers) + return false; + if (deleteOnNoConsumers != that.deleteOnNoConsumers) + return false; return user != null ? user.equals(that.user) : that.user == null; } @@ -256,11 +288,24 @@ public final class QueueConfig { result = 31 * result + (durable ? 1 : 0); result = 31 * result + (temporary ? 1 : 0); result = 31 * result + (autoCreated ? 1 : 0); + result = 31 * result + maxConsumers; + result = 31 * result + (deleteOnNoConsumers ? 1 : 0); return result; } @Override public String toString() { - return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}'; + return "QueueConfig{" + + "id=" + id + + ", address=" + address + + ", name=" + name + + ", filter=" + filter + + ", pageSubscription=" + pageSubscription + + ", user=" + user + + ", durable=" + durable + + ", temporary=" + temporary + + ", autoCreated=" + autoCreated + + ", maxConsumers=" + maxConsumers + + ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}'; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index cce81c5..9421df3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1509,6 +1509,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { + return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null); + } + + @Override + public Queue deployQueue(final SimpleString address, + final SimpleString resourceName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { if (resourceName.toString().toLowerCase().startsWith("jms.topic")) { ActiveMQServerLogger.LOGGER.deployTopic(resourceName); @@ -1516,7 +1528,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { ActiveMQServerLogger.LOGGER.deployQueue(resourceName); } - return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated); + return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers); } @Override @@ -2102,9 +2114,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { for (CoreQueueConfiguration config : queues) { - deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false); + deployQueue(SimpleString.toSimpleString(config.getAddress()), + SimpleString.toSimpleString(config.getName()), + SimpleString.toSimpleString(config.getFilterString()), + config.isDurable(), + false, + false, + config.getMaxConsumers(), + config.getDeleteOnNoConsumers()); } } + private void deployQueuesFromConfiguration() throws Exception { deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations()); } @@ -2228,6 +2248,30 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean ignoreIfExists, final boolean transientQueue, final boolean autoCreated) throws Exception { + return createQueue(addressName, + queueName, + filterString, + user, + durable, + temporary, + ignoreIfExists, + transientQueue, + autoCreated, + null, + null); + } + + private Queue createQueue(final SimpleString addressName, + final SimpleString queueName, + final SimpleString filterString, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean ignoreIfExists, + final boolean transientQueue, + final boolean autoCreated, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { @@ -2244,21 +2288,31 @@ public class ActiveMQServerImpl implements ActiveMQServer { final long queueID = storageManager.generateID(); final QueueConfig.Builder queueConfigBuilder; + final SimpleString address; if (addressName == null) { queueConfigBuilder = QueueConfig.builderWith(queueID, queueName); + address = queueName; } else { queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName); + address = addressName; } - final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); - final Queue queue = queueFactory.createQueueWith(queueConfig); - boolean addressAlreadyExists = true; + // FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API. + AddressInfo info = postOffice.addAddressInfo(new AddressInfo(address)); - if (postOffice.getAddressInfo(queue.getAddress()) == null) { - postOffice.addAddressInfo(new AddressInfo(queue.getAddress()) - .setRoutingType(AddressInfo.RoutingType.MULTICAST)); - addressAlreadyExists = false; - } + final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; + final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers; + + final QueueConfig queueConfig = queueConfigBuilder.filter(filter) + .pagingManager(pagingManager) + .user(user) + .durable(durable) + .temporary(temporary) + .autoCreated(autoCreated) + .deleteOnNoConsumers(isDeleteOnNoConsumers) + .maxConsumers(noMaxConsumers) + .build(); + final Queue queue = queueFactory.createQueueWith(queueConfig); if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); @@ -2270,7 +2324,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (queue.isDurable()) { storageManager.addQueueBinding(txID, localQueueBinding); - if (!addressAlreadyExists) { + if (info == null) { storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress())); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 4e982c4..7c71c1f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; public class AddressInfo { @@ -24,9 +25,9 @@ public class AddressInfo { private RoutingType routingType = RoutingType.MULTICAST; - private boolean defaultDeleteOnNoConsumers; + private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(); - private int defaultMaxConsumers; + private int defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); public AddressInfo(SimpleString name) { this.name = name; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 4e89e8a..9bd14f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -143,7 +143,13 @@ public class PostOfficeJournalLoader implements JournalLoader { } else { queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress()); } - queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated()); + queueConfigBuilder.filter(filter).pagingManager(pagingManager) + .user(queueBindingInfo.getUser()) + .durable(true) + .temporary(false) + .autoCreated(queueBindingInfo.isAutoCreated()) + .de + ); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
