Repository: activemq-artemis Updated Branches: refs/heads/master 71390fea4 -> 4507d7783
ARTEMIS-2081 listConfiguredQueues returns only queues created by config Extend test case to reproduce problem of client created queues being incorrectly removed on simple reload of config. Add a flag/field to the queues created by configuration/broker.xml so we can correctly filter only queues created/managed by config. Update listConfiguredQueues to use the new queue flag Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c417d0b5 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c417d0b5 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c417d0b5 Branch: refs/heads/master Commit: c417d0b5f8ac67b1c9bde67cd6d6300d4194e2f8 Parents: 71390fe Author: Michael André Pearce <[email protected]> Authored: Sat Sep 8 00:23:55 2018 +0100 Committer: Clebert Suconic <[email protected]> Committed: Tue Sep 11 13:55:11 2018 -0400 ---------------------------------------------------------------------- .../api/core/management/QueueControl.java | 6 +++ .../core/management/impl/QueueControlImpl.java | 12 ++++++ .../core/persistence/QueueBindingInfo.java | 4 ++ .../journal/AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 27 ++++++++++++- .../artemis/core/postoffice/PostOffice.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java | 7 +++- .../activemq/artemis/core/server/Queue.java | 4 ++ .../artemis/core/server/QueueConfig.java | 24 ++++++++++-- .../core/server/impl/ActiveMQServerImpl.java | 41 ++++++++++++++++++-- .../core/server/impl/LastValueQueue.java | 3 +- .../server/impl/PostOfficeJournalLoader.java | 3 +- .../core/server/impl/QueueFactoryImpl.java | 7 ++-- .../artemis/core/server/impl/QueueImpl.java | 21 ++++++++-- .../impl/ScheduledDeliveryHandlerTest.java | 10 +++++ .../tests/integration/jms/RedeployTest.java | 10 +++++ .../management/QueueControlUsingCoreTest.java | 5 +++ .../resources/reload-address-queues-updated.xml | 4 ++ .../test/resources/reload-address-queues.xml | 4 ++ .../unit/core/postoffice/impl/FakeQueue.java | 10 +++++ .../core/server/impl/fakes/FakePostOffice.java | 3 +- 21 files changed, 189 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index d213446..b210530 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -225,6 +225,12 @@ public interface QueueControl { /** * */ + @Attribute(desc = "is this queue managed by configuration (broker.xml)") + boolean isConfigurationManaged(); + + /** + * + */ @Attribute(desc = "If the queue should route exclusively to one consumer") boolean isExclusive(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 7377846..3db5cae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -490,6 +490,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override + public boolean isConfigurationManaged() { + checkStarted(); + + clearIO(); + try { + return queue.isConfigurationManaged(); + } finally { + blockOnIO(); + } + } + + @Override public boolean isExclusive() { checkStarted(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index ebc86fc..9d7bb7e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -40,6 +40,10 @@ public interface QueueBindingInfo { boolean isAutoCreated(); + boolean isConfigurationManaged(); + + void setConfigurationManaged(boolean configurationManaged); + SimpleString getUser(); void addQueueStatusEncoding(QueueStatusEncoding status); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 8c3cc77..39511f1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1290,7 +1290,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp SimpleString filterString = filter == null ? null : filter.getFilterString(); - PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType()); + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged()); readLock(); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 0cfe67c..a7d5216 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -56,6 +56,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public byte routingType; + public boolean configurationManaged; + public PersistentQueueBindingEncoding() { } @@ -86,6 +88,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin delayBeforeDispatch + ", routingType=" + routingType + + ", configurationManaged=" + + configurationManaged + "]"; } @@ -100,7 +104,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin final boolean lastValue, final int consumersBeforeDispatch, final long delayBeforeDispatch, - final byte routingType) { + final byte routingType, + final boolean configurationManaged) { this.name = name; this.address = address; this.filterString = filterString; @@ -113,6 +118,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin this.consumersBeforeDispatch = consumersBeforeDispatch; this.delayBeforeDispatch = delayBeforeDispatch; this.routingType = routingType; + this.configurationManaged = configurationManaged; } @Override @@ -155,6 +161,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } @Override + public boolean isConfigurationManaged() { + return configurationManaged; + } + + @Override + public void setConfigurationManaged(boolean configurationManaged) { + this.configurationManaged = configurationManaged; + } + + @Override public void addQueueStatusEncoding(QueueStatusEncoding status) { if (queueStatusEncodings == null) { queueStatusEncodings = new LinkedList<>(); @@ -288,6 +304,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin } else { delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(); } + if (buffer.readableBytes() > 0) { + configurationManaged = buffer.readBoolean(); + } else { + configurationManaged = false; + } } @Override @@ -304,6 +325,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin buffer.writeBoolean(lastValue); buffer.writeInt(consumersBeforeDispatch); buffer.writeLong(delayBeforeDispatch); + buffer.writeBoolean(configurationManaged); } @Override @@ -317,7 +339,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_INT + - DataConstants.SIZE_LONG; + DataConstants.SIZE_LONG + + DataConstants.SIZE_BOOLEAN; } private SimpleString createMetadata() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index d31e33b..6ed91b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -73,7 +73,8 @@ public interface PostOffice extends ActiveMQComponent { Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, - SimpleString user) throws Exception; + SimpleString user, + Boolean configurationManaged) throws Exception; List<Queue> listQueuesForAddress(SimpleString address) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index d4cde18..02abf46 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -471,7 +471,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, - SimpleString user) throws Exception { + SimpleString user, + Boolean configurationManaged) throws Exception { synchronized (addressLock) { final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name); if (queueBinding == null) { @@ -527,6 +528,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding changed = true; queue.setFilter(filter); } + if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) { + changed = true; + queue.setConfigurationManaged(configurationManaged); + } if (logger.isDebugEnabled()) { if (user == null && queue.getUser() != null) { logger.debug("Ignoring updating Queue to a NULL user"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 63d39c7..a8f1095 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -94,6 +94,10 @@ public interface Queue extends Bindable,CriticalComponent { void setMaxConsumer(int maxConsumers); + boolean isConfigurationManaged(); + + void setConfigurationManaged(boolean configurationManaged); + void addConsumer(Consumer consumer) throws Exception; void removeConsumer(Consumer consumer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index c83e08a..c79114d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -42,6 +42,7 @@ public final class QueueConfig { private final boolean purgeOnNoConsumers; private final int consumersBeforeDispatch; private final long delayBeforeDispatch; + private final boolean configurationManaged; public static final class Builder { @@ -61,6 +62,7 @@ public final class QueueConfig { private boolean purgeOnNoConsumers; private int consumersBeforeDispatch; private long delayBeforeDispatch; + private boolean configurationManaged; private Builder(final long id, final SimpleString name) { this(id, name, name); @@ -83,6 +85,7 @@ public final class QueueConfig { this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(); this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(); + this.configurationManaged = false; validateState(); } @@ -99,6 +102,11 @@ public final class QueueConfig { } } + public Builder configurationManaged(final boolean configurationManaged) { + this.configurationManaged = configurationManaged; + return this; + } + public Builder filter(final Filter filter) { this.filter = filter; return this; @@ -185,7 +193,7 @@ public final class QueueConfig { } else { pageSubscription = null; } - return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers); + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged); } } @@ -233,7 +241,8 @@ public final class QueueConfig { final boolean lastValue, final int consumersBeforeDispatch, final long delayBeforeDispatch, - final boolean purgeOnNoConsumers) { + final boolean purgeOnNoConsumers, + final boolean configurationManaged) { this.id = id; this.address = address; this.name = name; @@ -250,6 +259,7 @@ public final class QueueConfig { this.maxConsumers = maxConsumers; this.consumersBeforeDispatch = consumersBeforeDispatch; this.delayBeforeDispatch = delayBeforeDispatch; + this.configurationManaged = configurationManaged; } public long id() { @@ -316,6 +326,10 @@ public final class QueueConfig { return delayBeforeDispatch; } + public boolean isConfigurationManaged() { + return configurationManaged; + } + @Override public boolean equals(Object o) { if (this == o) @@ -357,6 +371,8 @@ public final class QueueConfig { return false; if (purgeOnNoConsumers != that.purgeOnNoConsumers) return false; + if (configurationManaged != that.configurationManaged) + return false; return user != null ? user.equals(that.user) : that.user == null; } @@ -379,6 +395,7 @@ public final class QueueConfig { result = 31 * result + consumersBeforeDispatch; result = 31 * result + Long.hashCode(delayBeforeDispatch); result = 31 * result + (purgeOnNoConsumers ? 1 : 0); + result = 31 * result + (configurationManaged ? 1 : 0); return result; } @@ -400,6 +417,7 @@ public final class QueueConfig { + ", lastValue=" + lastValue + ", consumersBeforeDispatch=" + consumersBeforeDispatch + ", delayBeforeDispatch=" + delayBeforeDispatch - + ", purgeOnNoConsumers=" + purgeOnNoConsumers + '}'; + + ", purgeOnNoConsumers=" + purgeOnNoConsumers + + ", configurationManaged=" + configurationManaged + '}'; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index dcb6e02..b5a49ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2729,7 +2729,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } private List<Queue> listConfiguredQueues(SimpleString address) throws Exception { - return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList()); + return listQueues(address).stream().filter(queue -> queue.isConfigurationManaged()).collect(Collectors.toList()); } private List<Queue> listQueues(SimpleString address) throws Exception { @@ -2794,7 +2794,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } else { // if the address::queue doesn't exist then create it try { - createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true); + createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true, true); } catch (ActiveMQQueueExistsException e) { // the queue may exist on a *different* address ActiveMQServerLogger.LOGGER.warn(e.getMessage()); @@ -3117,6 +3117,27 @@ public class ActiveMQServerImpl implements ActiveMQServer { final int consumersBeforeDispatch, final long delayBeforeDispatch, final boolean autoCreateAddress) throws Exception { + return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false); + } + + private Queue createQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString queueName, + final SimpleString filterString, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean ignoreIfExists, + final boolean transientQueue, + final boolean autoCreated, + final int maxConsumers, + final boolean purgeOnNoConsumers, + final boolean exclusive, + final boolean lastValue, + final int consumersBeforeDispatch, + final long delayBeforeDispatch, + final boolean autoCreateAddress, + final boolean configurationManaged) throws Exception { final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { @@ -3170,6 +3191,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { .lastValue(lastValue) .consumersBeforeDispatch(consumersBeforeDispatch) .delayBeforeDispatch(delayBeforeDispatch) + .configurationManaged(configurationManaged) .build(); if (hasBrokerQueuePlugins()) { @@ -3265,8 +3287,21 @@ public class ActiveMQServerImpl implements ActiveMQServer { Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception { + return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user, null); + } + + private Queue updateQueue(String name, + RoutingType routingType, + String filterString, + Integer maxConsumers, + Boolean purgeOnNoConsumers, + Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, + String user, + Boolean configurationManaged) throws Exception { final Filter filter = FilterImpl.createFilter(filterString); - final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user)); + final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged); if (queueBinding != null) { final Queue queue = queueBinding.getQueue(); return queue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 7c2ffee..fd62749 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -66,6 +66,7 @@ public class LastValueQueue extends QueueImpl { final Integer consumersBeforeDispatch, final Long delayBeforeDispatch, final Boolean purgeOnNoConsumers, + final boolean configurationManaged, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, @@ -73,7 +74,7 @@ public class LastValueQueue extends QueueImpl { final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 59a318c..59b1649 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader { .lastValue(queueBindingInfo.isLastValue()) .consumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch()) .delayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch()) - .routingType(RoutingType.getType(queueBindingInfo.getRoutingType())); + .routingType(RoutingType.getType(queueBindingInfo.getRoutingType())) + .configurationManaged((queueBindingInfo.isConfigurationManaged())); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 24b36e6..c8835d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -74,11 +74,10 @@ public class QueueFactoryImpl implements QueueFactory { public Queue createQueueWith(final QueueConfig config) { final Queue queue; if (config.isLastValue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } - server.getCriticalAnalyzer().add(queue); return queue; } @@ -102,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory { Queue queue; if (addressSettings.isDefaultLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } else { queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2891350..69d4336 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -279,9 +279,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public volatile long dispatchStartTime = -1; - private int consumersBeforeDispatch = 0; + private volatile int consumersBeforeDispatch = 0; - private long delayBeforeDispatch = 0; + private volatile long delayBeforeDispatch = 0; + + private volatile boolean configurationManaged; /** @@ -429,7 +431,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { - this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); + this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); } public QueueImpl(final long id, @@ -447,6 +449,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final Integer consumersBeforeDispatch, final Long delayBeforeDispatch, final Boolean purgeOnNoConsumers, + final boolean configurationManaged, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, final StorageManager storageManager, @@ -486,6 +489,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.delayBeforeDispatch = delayBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : delayBeforeDispatch; + this.configurationManaged = configurationManaged; + this.postOffice = postOffice; this.storageManager = storageManager; @@ -663,6 +668,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override + public boolean isConfigurationManaged() { + return configurationManaged; + } + + @Override + public synchronized void setConfigurationManaged(boolean configurationManaged) { + this.configurationManaged = configurationManaged; + } + + @Override public SimpleString getName() { return name; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index b21781d..3561e6f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -834,6 +834,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public boolean isConfigurationManaged() { + return false; + } + + @Override + public void setConfigurationManaged(boolean configurationManaged) { + + } + + @Override public void recheckRefCount(OperationContext context) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index 07bc22e..cec56a0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -320,8 +320,15 @@ public class RedeployTest extends ActiveMQTestBase { embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + try (JMSContext jmsContext = connectionFactory.createContext()) { + jmsContext.createSharedDurableConsumer(jmsContext.createTopic("config_test_consumer_created_queues"),"mySub").receive(100); + } + try { latch.await(10, TimeUnit.SECONDS); + Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub")); + Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue")); Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal")); Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal")); @@ -344,6 +351,9 @@ public class RedeployTest extends ActiveMQTestBase { embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); latch.await(10, TimeUnit.SECONDS); + //Ensure queues created by clients (NOT by broker.xml are not removed when we reload). + Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub")); + Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue")); Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal")); Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal")); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 0a62334..26ada03 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -147,6 +147,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override + public boolean isConfigurationManaged() { + return (Boolean) proxy.retrieveAttributeValue("configurationManaged"); + } + + @Override public boolean isExclusive() { return (Boolean) proxy.retrieveAttributeValue("exclusive"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml index f8d1d91..fd73f11 100644 --- a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml +++ b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml @@ -117,6 +117,10 @@ under the License. </wildcard-addresses> <addresses> + <address name="config_test_consumer_created_queues"> + <multicast> + </multicast> + </address> <address name="config_test_queue_removal"> <multicast> <queue name="config_test_queue_removal_queue_1"/> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/resources/reload-address-queues.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/resources/reload-address-queues.xml b/tests/integration-tests/src/test/resources/reload-address-queues.xml index ebd0f4e..74c9d08 100644 --- a/tests/integration-tests/src/test/resources/reload-address-queues.xml +++ b/tests/integration-tests/src/test/resources/reload-address-queues.xml @@ -120,6 +120,10 @@ under the License. </wildcard-addresses> <addresses> + <address name="config_test_consumer_created_queues"> + <multicast> + </multicast> + </address> <address name="config_test_queue_removal"> <multicast> <queue name="config_test_queue_removal_queue_1"/> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 5297ab6..7c1297d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -106,6 +106,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public boolean isConfigurationManaged() { + return false; + } + + @Override + public void setConfigurationManaged(boolean configurationManaged) { + + } + + @Override public boolean isInternalQueue() { // no-op return false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 0bbe8ef..5f128ea 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -53,7 +53,8 @@ public class FakePostOffice implements PostOffice { Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, - SimpleString user) throws Exception { + SimpleString user, + Boolean configurationManaged) throws Exception { return null; }
