This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b76c672305 ARTEMIS-4162 support deleting addresses & queues w/o usage
check
b76c672305 is described below
commit b76c67230538ff3b366793f7194bc4539b7620e7
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Feb 8 15:10:24 2023 -0600
ARTEMIS-4162 support deleting addresses & queues w/o usage check
There are certain use-cases where addresses will be auto-created and
never have a direct binding created on them. Because of this they will
never be auto-deleted. If a large number of these addresses build up
they will consume a problematic amount of heap space.
One specific example of this use-case is an MQTT subscriber with a
wild-card subscription and a large number of MQTT producers sending one
or two messages a large number of different MQTT topics covered by the
wild-card. Since no bindings are ever created on any of these individual
addresses (e.g. from a subscription queue) they will never be
auto-deleted, but they will eventually consume a large amount of heap.
The only way to deal with these addresses is to manually delete them.
There are also situations where queues may be created and never have
any messages sent to them or never have a consumer connect. These
queues will never be auto-deleted so they must be deleted manually.
This commit adds the ability to configure the broker to skip the usage
check so that these kinds of addresses and queues can be deleted
automatically.
---
.../deployers/impl/FileConfigurationParser.java | 10 ++-
.../artemis/core/postoffice/AddressManager.java | 7 +-
.../core/postoffice/impl/PostOfficeImpl.java | 19 ++---
.../core/postoffice/impl/SimpleAddressManager.java | 19 ++---
.../artemis/core/server/ActiveMQServerLogger.java | 4 +-
.../apache/activemq/artemis/core/server/Queue.java | 4 ++
.../artemis/core/server/impl/AddressInfo.java | 19 ++++-
.../artemis/core/server/impl/QueueImpl.java | 9 +++
.../artemis/core/server/impl/QueueManagerImpl.java | 9 +--
.../core/settings/impl/AddressSettings.java | 63 +++++++++++++++++
.../resources/schema/artemis-configuration.xsd | 16 +++++
.../core/config/impl/FileConfigurationTest.java | 4 ++
.../postoffice/impl/PostOfficeTestAccessor.java | 5 ++
.../resources/ConfigurationTest-full-config.xml | 2 +
...rationTest-xinclude-config-address-settings.xml | 2 +
...est-xinclude-schema-config-address-settings.xml | 2 +
.../org/apache/activemq/artemis/utils/Wait.java | 2 +-
docs/user-manual/en/address-settings.md | 20 ++++++
.../integration/client/AutoDeleteAddressTest.java | 80 +++++++++++++++++++++-
.../integration/client/AutoDeleteQueueTest.java | 29 +++++++-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 43 ++++++++++++
21 files changed, 323 insertions(+), 45 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8461db483c..c8dbf6e692 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -287,6 +287,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String AUTO_DELETE_QUEUES_MESSAGE_COUNT =
"auto-delete-queues-message-count";
+ private static final String AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK =
"auto-delete-queues-skip-usage-check";
+
private static final String CONFIG_DELETE_QUEUES = "config-delete-queues";
private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses";
@@ -295,6 +297,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String AUTO_DELETE_ADDRESSES_DELAY =
"auto-delete-addresses-delay";
+ private static final String AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK =
"auto-delete-addresses-skip-usage-check";
+
private static final String CONFIG_DELETE_ADDRESSES =
"config-delete-addresses";
private static final String CONFIG_DELETE_DIVERTS = "config-delete-diverts";
@@ -337,8 +341,6 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String ENABLE_INGRESS_TIMESTAMP =
"enable-ingress-timestamp";
- private static final String SUPPRESS_SESSION_NOTIFICATIONS =
"suppress-session-notifications";
-
private boolean validateAIO = false;
private boolean printPageMaxSizeUsed = false;
@@ -1372,6 +1374,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
long autoDeleteQueuesMessageCount = XMLUtil.parseLong(child);
Validators.MINUS_ONE_OR_GE_ZERO.validate(AUTO_DELETE_QUEUES_MESSAGE_COUNT,
autoDeleteQueuesMessageCount);
addressSettings.setAutoDeleteQueuesMessageCount(autoDeleteQueuesMessageCount);
+ } else if
(AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK.equalsIgnoreCase(name)) {
+
addressSettings.setAutoDeleteQueuesSkipUsageCheck(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES,
value);
@@ -1385,6 +1389,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
long autoDeleteAddressesDelay = XMLUtil.parseLong(child);
Validators.GE_ZERO.validate(AUTO_DELETE_ADDRESSES_DELAY,
autoDeleteAddressesDelay);
addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay);
+ } else if
(AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK.equalsIgnoreCase(name)) {
+
addressSettings.setAutoDeleteAddressesSkipUsageCheck(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES,
value);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index e7720da333..d9090d5b74 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -83,13 +83,8 @@ public interface AddressManager {
void scanAddresses(MirrorController mirrorController) throws Exception;
- boolean checkAutoRemoveAddress(SimpleString address,
- AddressInfo addressInfo,
+ boolean checkAutoRemoveAddress(AddressInfo addressInfo,
AddressSettings settings,
boolean ignoreDelay) throws Exception;
- boolean checkAutoRemoveAddress(SimpleString address,
- AddressInfo addressInfo,
- AddressSettings settings) throws Exception;
-
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 3950a4dcaa..94be7a8a58 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1038,8 +1038,8 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
@Override
public boolean isAddressBound(final SimpleString address) throws Exception {
- Bindings bindings = lookupBindingsForAddress(address);
- return bindings != null && !bindings.getBindings().isEmpty();
+ Collection<Binding> bindings = getDirectBindings(address);
+ return bindings != null && !bindings.isEmpty();
}
@Override
@@ -1970,15 +1970,16 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
- private static boolean queueWasUsed(Queue queue) {
- return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged()
> 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() !=
-1;
+ private static boolean queueWasUsed(Queue queue, AddressSettings settings) {
+ return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged()
> 0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() !=
-1 || settings.getAutoDeleteQueuesSkipUsageCheck();
}
/** To be used by the AddressQueueReaper.
* It is also exposed for tests through PostOfficeTestAccessor */
void reapAddresses(boolean initialCheck) {
getLocalQueues().forEach(queue -> {
- if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue)
&& QueueManagerImpl.consumerCountCheck(queue) && (initialCheck ||
QueueManagerImpl.delayCheck(queue)) &&
QueueManagerImpl.messageCountCheck(queue) && (initialCheck ||
queueWasUsed(queue))) {
+ AddressSettings settings =
addressSettingsRepository.getMatch(queue.getAddress().toString());
+ if (!queue.isInternalQueue() && queue.isAutoDelete() &&
QueueManagerImpl.consumerCountCheck(queue) && (initialCheck ||
QueueManagerImpl.delayCheck(queue, settings)) &&
QueueManagerImpl.messageCountCheck(queue) && (initialCheck ||
queueWasUsed(queue, settings))) {
if (initialCheck || queue.isSwept()) {
if (logger.isDebugEnabled()) {
if (initialCheck) {
@@ -2003,7 +2004,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
AddressSettings settings =
addressSettingsRepository.getMatch(address.toString());
try {
- if (addressManager.checkAutoRemoveAddress(address, addressInfo,
settings, initialCheck)) {
+ if (addressManager.checkAutoRemoveAddress(addressInfo, settings,
initialCheck)) {
if (initialCheck || addressInfo.isSwept()) {
server.autoRemoveAddressInfo(address, null);
@@ -2033,12 +2034,6 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
- public boolean checkAutoRemoveAddress(SimpleString address,
- AddressInfo addressInfo,
- AddressSettings settings) throws Exception {
- return settings.isAutoDeleteAddresses() && addressInfo != null &&
addressInfo.isAutoCreated() && !isAddressBound(address) &&
addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() -
addressInfo.getBindingRemovedTimestamp() >=
settings.getAutoDeleteAddressesDelay());
- }
-
private Stream<Queue> getLocalQueues() {
return addressManager.getBindings()
.filter(binding -> binding.getType() == BindingType.LOCAL_QUEUE)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index bce068e930..3b96de204c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -368,17 +368,18 @@ public class SimpleAddressManager implements
AddressManager {
}
@Override
- public boolean checkAutoRemoveAddress(SimpleString address,
- AddressInfo addressInfo,
- AddressSettings settings) throws
Exception {
- return checkAutoRemoveAddress(address, addressInfo, settings, false);
+ public boolean checkAutoRemoveAddress(AddressInfo addressInfo,
+ AddressSettings settings,
+ boolean ignoreDelay) throws Exception
{
+ return settings.isAutoDeleteAddresses() && addressInfo != null &&
addressInfo.isAutoCreated() &&
!bindingsFactory.isAddressBound(addressInfo.getName()) &&
addressWasUsed(addressInfo, settings) && (ignoreDelay ||
delayCheck(addressInfo, settings));
}
- @Override
- public boolean checkAutoRemoveAddress(SimpleString address,
- AddressInfo addressInfo,
- AddressSettings settings, boolean
ignoreDelay) throws Exception {
- return settings.isAutoDeleteAddresses() && addressInfo != null &&
addressInfo.isAutoCreated() && !bindingsFactory.isAddressBound(address) &&
(ignoreDelay || addressInfo.getBindingRemovedTimestamp() != -1 &&
(System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >=
settings.getAutoDeleteAddressesDelay()));
+ private boolean delayCheck(AddressInfo addressInfo, AddressSettings
settings) {
+ return (!settings.isAutoDeleteAddressesSkipUsageCheck() &&
System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >=
settings.getAutoDeleteAddressesDelay()) ||
(settings.isAutoDeleteAddressesSkipUsageCheck() && System.currentTimeMillis() -
addressInfo.getCreatedTimestamp() >= settings.getAutoDeleteAddressesDelay());
+ }
+
+ private boolean addressWasUsed(AddressInfo addressInfo, AddressSettings
settings) {
+ return addressInfo.getBindingRemovedTimestamp() != -1 ||
settings.isAutoDeleteAddressesSkipUsageCheck();
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 03426d92b8..a67072a8af 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1540,10 +1540,10 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224111, value = "Both 'whitelist' and 'allowlist'
detected. Configuration 'whitelist' is deprecated, please use only the
'allowlist' configuration", level = LogMessage.Level.WARN)
void useOnlyAllowList();
- @LogMessage(id = 224112, value = "Auto removing Queue {} with queueID={} on
address={}", level = LogMessage.Level.INFO)
+ @LogMessage(id = 224112, value = "Auto removing queue {} with queueID={} on
address={}", level = LogMessage.Level.INFO)
void autoRemoveQueue(String name, long queueID, String address);
- @LogMessage(id = 224113, value = "Auto removing Address {}", level =
LogMessage.Level.INFO)
+ @LogMessage(id = 224113, value = "Auto removing address {}", level =
LogMessage.Level.INFO)
void autoRemoveAddress(String name);
@LogMessage(id = 224114, value = "Address control block, blocking message
production on address '{}'. Clients will not get further credit.", level =
LogMessage.Level.INFO)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 81dcf4cbb9..97b9193f3e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -542,4 +542,8 @@ public interface Queue extends Bindable,CriticalComponent {
default QueueConfiguration getQueueConfiguration() {
return null;
}
+
+ default long getCreatedTimestamp() {
+ return -1;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index d6e0ec8127..2e4749a5d6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -73,13 +73,14 @@ public class AddressInfo {
private static final AtomicLongFieldUpdater<AddressInfo>
unRoutedMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");
- private long bindingRemovedTimestamp = -1;
+ private volatile long bindingRemovedTimestamp = -1;
private volatile boolean paused = false;
private PostOffice postOffice;
private StorageManager storageManager;
private HierarchicalRepositoryChangeListener repositoryChangeListener;
+ private long createdTimestamp = -1;
public boolean isSwept() {
return swept;
@@ -96,11 +97,11 @@ public class AddressInfo {
}
public AddressInfo(String name) {
- this(SimpleString.toSimpleString(name),
EnumSet.noneOf(RoutingType.class));
+ this(SimpleString.toSimpleString(name), EMPTY_ROUTING_TYPES);
}
public AddressInfo(SimpleString name) {
- this(name, EnumSet.noneOf(RoutingType.class));
+ this(name, EMPTY_ROUTING_TYPES);
}
/**
@@ -110,6 +111,7 @@ public class AddressInfo {
*/
public AddressInfo(SimpleString name, EnumSet<RoutingType> routingTypes) {
this.name = CompositeAddress.extractAddressName(name);
+ this.createdTimestamp = System.currentTimeMillis();
setRoutingTypes(routingTypes);
}
@@ -120,6 +122,7 @@ public class AddressInfo {
*/
public AddressInfo(SimpleString name, RoutingType routingType) {
this.name = CompositeAddress.extractAddressName(name);
+ this.createdTimestamp = System.currentTimeMillis();
addRoutingType(routingType);
}
@@ -313,6 +316,9 @@ public class AddressInfo {
buff.append("}");
buff.append(", autoCreated=").append(autoCreated);
buff.append(", paused=").append(paused);
+ buff.append(",
bindingRemovedTimestamp=").append(bindingRemovedTimestamp);
+ buff.append(", swept=").append(swept);
+ buff.append(", createdTimestamp=").append(createdTimestamp);
buff.append("]");
return buff.toString();
}
@@ -388,6 +394,7 @@ public class AddressInfo {
}
builder.add("routingTypes", arrayBuilder);
}
+ builder.add("created-timestamp", createdTimestamp);
return builder.build().toString();
}
@@ -412,6 +419,9 @@ public class AddressInfo {
JsonNumber jsonNumber = (JsonNumber)rtValue;
this.addRoutingType(RoutingType.getType((byte)jsonNumber.intValue()));
}
+ } else if (key.equals("created-timestamp")) {
+ JsonNumber jsonLong = (JsonNumber) value;
+ this.createdTimestamp = jsonLong.longValue();
}
}
@@ -427,4 +437,7 @@ public class AddressInfo {
return result;
}
+ public long getCreatedTimestamp() {
+ return createdTimestamp;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 738620b173..e27b89eea2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -347,6 +347,8 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private volatile long ringSize;
+ private volatile long createdTimestamp = -1;
+
@Override
public boolean isSwept() {
return swept;
@@ -635,6 +637,8 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
final QueueFactory factory) {
super(server == null ? EmptyCriticalAnalyzer.getInstance() :
server.getCriticalAnalyzer(), CRITICAL_PATHS);
+ this.createdTimestamp = System.currentTimeMillis();
+
this.id = queueConfiguration.getId();
this.address = queueConfiguration.getAddress();
@@ -1603,6 +1607,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
this.ringSize = ringSize;
}
+ @Override
+ public long getCreatedTimestamp() {
+ return createdTimestamp;
+ }
+
public long getMessageCountForRing() {
return (long) pendingMetrics.getMessageCount();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index 535e7d972a..db909bf29c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -65,7 +65,6 @@ public class QueueManagerImpl extends ReferenceCounterUtil
implements QueueManag
public static void performAutoDeleteQueue(ActiveMQServer server, Queue
queue) {
SimpleString queueName = queue.getName();
- AddressSettings settings =
server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
if (logger.isDebugEnabled()) {
logger.debug("deleting auto-created queue \"{}\": consumerCount = {};
messageCount = {}; isAutoDelete = {}", queueName, queue.getConsumerCount(),
queue.getMessageCount(), queue.isAutoDelete());
@@ -80,16 +79,12 @@ public class QueueManagerImpl extends ReferenceCounterUtil
implements QueueManag
}
}
- public static boolean isAutoDelete(Queue queue) {
- return queue.isAutoDelete();
- }
-
public static boolean messageCountCheck(Queue queue) {
return queue.getAutoDeleteMessageCount() == -1 ||
queue.getMessageCount() <= queue.getAutoDeleteMessageCount();
}
- public static boolean delayCheck(Queue queue) {
- return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp()
>= queue.getAutoDeleteDelay();
+ public static boolean delayCheck(Queue queue, AddressSettings settings) {
+ return (!settings.getAutoDeleteQueuesSkipUsageCheck() &&
System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >=
queue.getAutoDeleteDelay()) || (settings.getAutoDeleteQueuesSkipUsageCheck() &&
System.currentTimeMillis() - queue.getCreatedTimestamp() >=
queue.getAutoDeleteDelay());
}
public static boolean consumerCountCheck(Queue queue) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index ea9e88c213..202b082902 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -82,6 +82,8 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
public static final long DEFAULT_AUTO_DELETE_QUEUES_DELAY = 0;
+ public static final boolean DEFAULT_AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK =
false;
+
public static final long DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT = 0;
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES =
DeletionPolicy.OFF;
@@ -92,6 +94,8 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
public static final long DEFAULT_AUTO_DELETE_ADDRESSES_DELAY = 0;
+ public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK
= false;
+
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES =
DeletionPolicy.OFF;
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_DIVERTS =
DeletionPolicy.OFF;
@@ -231,6 +235,8 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
private Long autoDeleteQueuesDelay = null;
+ private Boolean autoDeleteQueuesSkipUsageCheck = null;
+
private Long autoDeleteQueuesMessageCount = null;
private Long defaultRingSize = null;
@@ -245,6 +251,8 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
private Long autoDeleteAddressesDelay = null;
+ private Boolean autoDeleteAddressesSkipUsageCheck = null;
+
private DeletionPolicy configDeleteAddresses = null;
private DeletionPolicy configDeleteDiverts = null;
@@ -335,10 +343,12 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
this.autoDeleteQueues = other.autoDeleteQueues;
this.autoDeleteCreatedQueues = other.autoDeleteCreatedQueues;
this.autoDeleteQueuesDelay = other.autoDeleteQueuesDelay;
+ this.autoDeleteQueuesSkipUsageCheck =
other.autoDeleteQueuesSkipUsageCheck;
this.configDeleteQueues = other.configDeleteQueues;
this.autoCreateAddresses = other.autoCreateAddresses;
this.autoDeleteAddresses = other.autoDeleteAddresses;
this.autoDeleteAddressesDelay = other.autoDeleteAddressesDelay;
+ this.autoDeleteAddressesSkipUsageCheck =
other.autoDeleteAddressesSkipUsageCheck;
this.configDeleteAddresses = other.configDeleteAddresses;
this.configDeleteDiverts = other.configDeleteDiverts;
this.managementBrowsePageSize = other.managementBrowsePageSize;
@@ -446,6 +456,15 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
return this;
}
+ public boolean getAutoDeleteQueuesSkipUsageCheck() {
+ return autoDeleteQueuesSkipUsageCheck != null ?
autoDeleteQueuesSkipUsageCheck :
AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_SKIP_USAGE_CHECK;
+ }
+
+ public AddressSettings setAutoDeleteQueuesSkipUsageCheck(final boolean
autoDeleteQueuesSkipUsageCheck) {
+ this.autoDeleteQueuesSkipUsageCheck = autoDeleteQueuesSkipUsageCheck;
+ return this;
+ }
+
public long getAutoDeleteQueuesMessageCount() {
return autoDeleteQueuesMessageCount != null ?
autoDeleteQueuesMessageCount :
AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT;
}
@@ -491,6 +510,15 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
return this;
}
+ public boolean isAutoDeleteAddressesSkipUsageCheck() {
+ return autoDeleteAddressesSkipUsageCheck != null ?
autoDeleteAddressesSkipUsageCheck :
AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES_SKIP_USAGE_CHECK;
+ }
+
+ public AddressSettings setAutoDeleteAddressesSkipUsageCheck(final boolean
autoDeleteAddressesSkipUsageCheck) {
+ this.autoDeleteAddressesSkipUsageCheck =
autoDeleteAddressesSkipUsageCheck;
+ return this;
+ }
+
public DeletionPolicy getConfigDeleteAddresses() {
return configDeleteAddresses != null ? configDeleteAddresses :
AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES;
}
@@ -1151,6 +1179,9 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
if (autoDeleteQueuesDelay == null) {
autoDeleteQueuesDelay = merged.autoDeleteQueuesDelay;
}
+ if (autoDeleteQueuesSkipUsageCheck == null) {
+ autoDeleteQueuesSkipUsageCheck =
merged.autoDeleteQueuesSkipUsageCheck;
+ }
if (autoDeleteQueuesMessageCount == null) {
autoDeleteQueuesMessageCount = merged.autoDeleteQueuesMessageCount;
}
@@ -1166,6 +1197,9 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
if (autoDeleteAddressesDelay == null) {
autoDeleteAddressesDelay = merged.autoDeleteAddressesDelay;
}
+ if (autoDeleteAddressesSkipUsageCheck == null) {
+ autoDeleteAddressesSkipUsageCheck =
merged.autoDeleteAddressesSkipUsageCheck;
+ }
if (configDeleteAddresses == null) {
configDeleteAddresses = merged.configDeleteAddresses;
}
@@ -1535,6 +1569,14 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
pageFullMessagePolicy = null;
}
}
+
+ if (buffer.readableBytes() > 0) {
+ autoDeleteQueuesSkipUsageCheck =
BufferHelper.readNullableBoolean(buffer);
+ }
+
+ if (buffer.readableBytes() > 0) {
+ autoDeleteAddressesSkipUsageCheck =
BufferHelper.readNullableBoolean(buffer);
+ }
}
@Override
@@ -1584,7 +1626,9 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
SimpleString.sizeofNullableString(defaultLastValueKey) +
BufferHelper.sizeOfNullableBoolean(defaultNonDestructive) +
BufferHelper.sizeOfNullableLong(autoDeleteQueuesDelay) +
+ BufferHelper.sizeOfNullableBoolean(autoDeleteQueuesSkipUsageCheck) +
BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay) +
+ BufferHelper.sizeOfNullableBoolean(autoDeleteAddressesSkipUsageCheck)
+
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalance) +
BufferHelper.sizeOfNullableInteger(defaultGroupBuckets) +
SimpleString.sizeofNullableString(defaultGroupFirstKey) +
@@ -1755,6 +1799,9 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
buffer.writeNullableSimpleString(pageFullMessagePolicy != null ? new
SimpleString(pageFullMessagePolicy.toString()) : null);
+ BufferHelper.writeNullableBoolean(buffer,
autoDeleteQueuesSkipUsageCheck);
+
+ BufferHelper.writeNullableBoolean(buffer,
autoDeleteAddressesSkipUsageCheck);
}
/* (non-Javadoc)
@@ -1797,11 +1844,13 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoDeleteQueues == null) ? 0 :
autoDeleteQueues.hashCode());
result = prime * result + ((autoDeleteCreatedQueues == null) ? 0 :
autoDeleteCreatedQueues.hashCode());
result = prime * result + ((autoDeleteQueuesDelay == null) ? 0 :
autoDeleteQueuesDelay.hashCode());
+ result = prime * result + ((autoDeleteQueuesSkipUsageCheck == null) ? 0
: autoDeleteQueuesSkipUsageCheck.hashCode());
result = prime * result + ((autoDeleteQueuesMessageCount == null) ? 0 :
autoDeleteQueuesMessageCount.hashCode());
result = prime * result + ((configDeleteQueues == null) ? 0 :
configDeleteQueues.hashCode());
result = prime * result + ((autoCreateAddresses == null) ? 0 :
autoCreateAddresses.hashCode());
result = prime * result + ((autoDeleteAddresses == null) ? 0 :
autoDeleteAddresses.hashCode());
result = prime * result + ((autoDeleteAddressesDelay == null) ? 0 :
autoDeleteAddressesDelay.hashCode());
+ result = prime * result + ((autoDeleteAddressesSkipUsageCheck == null) ?
0 : autoDeleteAddressesSkipUsageCheck.hashCode());
result = prime * result + ((configDeleteAddresses == null) ? 0 :
configDeleteAddresses.hashCode());
result = prime * result + ((configDeleteDiverts == null) ? 0 :
configDeleteDiverts.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 :
managementBrowsePageSize.hashCode());
@@ -2015,6 +2064,11 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
return false;
} else if (!autoDeleteQueuesDelay.equals(other.autoDeleteQueuesDelay))
return false;
+ if (autoDeleteQueuesSkipUsageCheck == null) {
+ if (other.autoDeleteQueuesSkipUsageCheck != null)
+ return false;
+ } else if
(!autoDeleteQueuesSkipUsageCheck.equals(other.autoDeleteQueuesSkipUsageCheck))
+ return false;
if (autoDeleteQueuesMessageCount == null) {
if (other.autoDeleteQueuesMessageCount != null)
return false;
@@ -2040,6 +2094,11 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
return false;
} else if
(!autoDeleteAddressesDelay.equals(other.autoDeleteAddressesDelay))
return false;
+ if (autoDeleteAddressesSkipUsageCheck == null) {
+ if (other.autoDeleteAddressesSkipUsageCheck != null)
+ return false;
+ } else if
(!autoDeleteAddressesSkipUsageCheck.equals(other.autoDeleteAddressesSkipUsageCheck))
+ return false;
if (configDeleteAddresses == null) {
if (other.configDeleteAddresses != null)
return false;
@@ -2309,6 +2368,8 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
autoDeleteCreatedQueues +
", autoDeleteQueuesDelay=" +
autoDeleteQueuesDelay +
+ ", autoDeleteQueuesSkipUsageCheck=" +
+ autoDeleteQueuesSkipUsageCheck +
", autoDeleteQueuesMessageCount=" +
autoDeleteQueuesMessageCount +
", configDeleteQueues=" +
@@ -2319,6 +2380,8 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
autoDeleteAddresses +
", autoDeleteAddressesDelay=" +
autoDeleteAddressesDelay +
+ ", autoDeleteAddressesSkipUsageCheck=" +
+ autoDeleteAddressesSkipUsageCheck +
", configDeleteAddresses=" +
configDeleteAddresses +
", configDeleteDiverts=" +
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index a6953148c1..863be22f4c 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -4090,6 +4090,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="auto-delete-queues-skip-usage-check"
type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ whether or not to check that the queue has actually be used
before auto-deleting it
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="config-delete-queues" default="OFF" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@@ -4132,6 +4140,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="auto-delete-addresses-skip-usage-check"
type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ whether or not to check that the address has actually be
used before auto-deleting it
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="config-delete-addresses" default="OFF"
maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 6ee9285a15..a80475b4f2 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -450,7 +450,9 @@ public class FileConfigurationTest extends
ConfigurationImplTest {
assertEquals(true,
conf.getAddressSettings().get("a1").isAutoCreateJmsTopics());
assertEquals(true,
conf.getAddressSettings().get("a1").isAutoDeleteJmsTopics());
assertEquals(0,
conf.getAddressSettings().get("a1").getAutoDeleteQueuesDelay());
+ assertEquals(false,
conf.getAddressSettings().get("a1").getAutoDeleteQueuesSkipUsageCheck());
assertEquals(0,
conf.getAddressSettings().get("a1").getAutoDeleteAddressesDelay());
+ assertEquals(false,
conf.getAddressSettings().get("a1").isAutoDeleteAddressesSkipUsageCheck());
assertEquals(false,
conf.getAddressSettings().get("a1").isDefaultPurgeOnNoConsumers());
assertEquals(Integer.valueOf(5),
conf.getAddressSettings().get("a1").getDefaultMaxConsumers());
assertEquals(RoutingType.ANYCAST,
conf.getAddressSettings().get("a1").getDefaultQueueRoutingType());
@@ -486,7 +488,9 @@ public class FileConfigurationTest extends
ConfigurationImplTest {
assertEquals(false,
conf.getAddressSettings().get("a2").isAutoCreateJmsTopics());
assertEquals(false,
conf.getAddressSettings().get("a2").isAutoDeleteJmsTopics());
assertEquals(500,
conf.getAddressSettings().get("a2").getAutoDeleteQueuesDelay());
+ assertEquals(true,
conf.getAddressSettings().get("a2").getAutoDeleteQueuesSkipUsageCheck());
assertEquals(1000,
conf.getAddressSettings().get("a2").getAutoDeleteAddressesDelay());
+ assertEquals(true,
conf.getAddressSettings().get("a2").isAutoDeleteAddressesSkipUsageCheck());
assertEquals(true,
conf.getAddressSettings().get("a2").isDefaultPurgeOnNoConsumers());
assertEquals(Integer.valueOf(15),
conf.getAddressSettings().get("a2").getDefaultMaxConsumers());
assertEquals(RoutingType.MULTICAST,
conf.getAddressSettings().get("a2").getDefaultQueueRoutingType());
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java
index 2fa55cfdfb..0966b0034b 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeTestAccessor.java
@@ -22,4 +22,9 @@ public class PostOfficeTestAccessor {
postOffice.reapAddresses(false);
}
+ public static void sweepAndReapAddresses(PostOfficeImpl postOffice) {
+ reapAddresses(postOffice);
+ reapAddresses(postOffice);
+ reapAddresses(postOffice);
+ }
}
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 8adcc93cf2..e3980a8bce 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -554,9 +554,11 @@
<auto-create-queues>false</auto-create-queues>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-queues-delay>500</auto-delete-queues-delay>
+
<auto-delete-queues-skip-usage-check>true</auto-delete-queues-skip-usage-check>
<auto-create-addresses>false</auto-create-addresses>
<auto-delete-addresses>false</auto-delete-addresses>
<auto-delete-addresses-delay>1000</auto-delete-addresses-delay>
+
<auto-delete-addresses-skip-usage-check>true</auto-delete-addresses-skip-usage-check>
<default-purge-on-no-consumers>true</default-purge-on-no-consumers>
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 445c1842f6..ef34e4836e 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -71,9 +71,11 @@
<auto-create-queues>false</auto-create-queues>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-queues-delay>500</auto-delete-queues-delay>
+
<auto-delete-queues-skip-usage-check>true</auto-delete-queues-skip-usage-check>
<auto-create-addresses>false</auto-create-addresses>
<auto-delete-addresses>false</auto-delete-addresses>
<auto-delete-addresses-delay>1000</auto-delete-addresses-delay>
+
<auto-delete-addresses-skip-usage-check>true</auto-delete-addresses-skip-usage-check>
<default-purge-on-no-consumers>true</default-purge-on-no-consumers>
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
index 445c1842f6..ef34e4836e 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
@@ -71,9 +71,11 @@
<auto-create-queues>false</auto-create-queues>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-queues-delay>500</auto-delete-queues-delay>
+
<auto-delete-queues-skip-usage-check>true</auto-delete-queues-skip-usage-check>
<auto-create-addresses>false</auto-create-addresses>
<auto-delete-addresses>false</auto-delete-addresses>
<auto-delete-addresses-delay>1000</auto-delete-addresses-delay>
+
<auto-delete-addresses-skip-usage-check>true</auto-delete-addresses-skip-usage-check>
<default-purge-on-no-consumers>true</default-purge-on-no-consumers>
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
diff --git
a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
index 099c45a0ac..d074f766d8 100644
---
a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
+++
b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java
@@ -111,7 +111,7 @@ public class Wait {
public static void assertEquals(Object obj, ObjectCondition condition, long
timeout, long sleepMillis) throws Exception {
- boolean result = waitFor(() -> (obj == condition ||
obj.equals(condition.getObject())), timeout, sleepMillis);
+ boolean result = waitFor(() -> (obj == condition || (obj != null &&
obj.equals(condition.getObject()))), timeout, sleepMillis);
if (!result) {
Assert.assertEquals(obj, condition.getObject());
diff --git a/docs/user-manual/en/address-settings.md
b/docs/user-manual/en/address-settings.md
index 51fdb880b5..cd5347d848 100644
--- a/docs/user-manual/en/address-settings.md
+++ b/docs/user-manual/en/address-settings.md
@@ -44,11 +44,13 @@ that would be found in the `broker.xml` file.
<auto-delete-created-queues>false</auto-delete-created-queues>
<auto-delete-queues-delay>0</auto-delete-queues-delay>
<auto-delete-queues-message-count>0</auto-delete-queues-message-count>
+
<auto-delete-queues-skip-usage-check>false</auto-delete-queues-skip-usage-check>
<config-delete-queues>OFF</config-delete-queues>
<config-delete-diverts>OFF</config-delete-diverts>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-addresses>true</auto-delete-addresses>
<auto-delete-addresses-delay>0</auto-delete-addresses-delay>
+
<auto-delete-addresses-skip-usage-check>false</auto-delete-addresses-skip-usage-check>
<config-delete-addresses>OFF</config-delete-addresses>
<management-browse-page-size>200</management-browse-page-size>
<management-message-attribute-size-limit>256</management-message-attribute-size-limit>
@@ -264,6 +266,15 @@ less than or equal to before deleting auto-created queues.
To disable message count check `-1` can be set.
Default is `0` (empty queue).
+`auto-delete-queues-skip-usage-check`. A queue will only be auto-deleted by
+default if it has actually been "used." A queue is considered "used" if any
+messages have been sent to it or any consumers have connected to it during its
+life. However, there are use-cases where it's useful to skip this check. When
+set to `true` it is **imperative** to also set `auto-delete-queues-delay` to a
+value greater than `0` otherwise queues may be deleted almost immediately after
+being created. In this case the queue will be deleted based on when it was
+created rather then when it was last "used." Default is `false`.
+
**Note:** the above auto-delete address settings can also be configured
individually at the queue level when a client auto creates the queue.
@@ -296,6 +307,15 @@ is `0` (delete immediately). The broker's
`address-queue-scan-period` controls
how often (in milliseconds) addresses are scanned for potential deletion. Use
`-1` to disable scanning. The default scan value is `30000`.
+`auto-delete-addresses-skip-usage-check`. An address will only be
auto-deleted by
+default if it has actually been "used." An address is considered "used" if any
+queues have been created on it during its life. However, there are use-cases
+where it's useful to skip this check. When set to `true` it is **imperative**
to
+also set `auto-delete-addresses-delay` to a value greater than `0` otherwise
+addresses may be deleted almost immediately after being created. In this case
+the address will be deleted based on when it was created rather then when it
was
+last "used." Default is `false`.
+
`config-delete-addresses`. How the broker should handle addresses deleted on
config reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more
about [configuration reload](config-reload.md).
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
index 0079997893..b215713153 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
@@ -16,14 +16,27 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +56,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
super.setUp();
locator = createInVMNonHALocator();
server = createServer(false);
- server.getConfiguration().setAddressQueueScanPeriod(10);
+ server.getConfiguration().setAddressQueueScanPeriod(0);
server.start();
cf = createSessionFactory(locator);
@@ -55,6 +68,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
server.createQueue(new
QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close();
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
Wait.assertTrue(() -> server.getAddressInfo(addressA) == null);
}
@@ -64,6 +78,68 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
server.createQueue(new
QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close();
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+ assertNotNull(server.getAddressInfo(addressA));
+ }
+
+ @Test
+ public void testAutoDeleteAutoCreatedAddressSkipUsageCheckWithDelay()
throws Exception {
+ final long DELAY = 1500;
+ server.getAddressSettingsRepository().addMatch(addressA.toString(), new
AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true).setAutoDeleteAddressesDelay(DELAY));
+ server.addAddressInfo(new AddressInfo(addressA).setAutoCreated(true));
+ long start = System.currentTimeMillis();
assertNotNull(server.getAddressInfo(addressA));
+ while (System.currentTimeMillis() - start <= DELAY) {
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+ assertNotNull(server.getAddressInfo(addressA));
+ Thread.sleep(100);
+ }
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+ assertNull(server.getAddressInfo(addressA));
+ }
+
+ @Test
+ public void testAutoDeleteAddressWithWildcardAddress() throws Exception {
+ String prefix = "topic";
+ server.getAddressSettingsRepository().addMatch(prefix + ".#", new
AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true));
+ String wildcardAddress = prefix + ".#";
+ String queue = RandomUtil.randomString();
+ final int MESSAGE_COUNT = 10;
+ final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
+
+ server.createQueue(new
QueueConfiguration(queue).setAddress(wildcardAddress).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
+
+ ClientSession consumerSession = cf.createSession();
+ ClientConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageHandler(message -> latch.countDown());
+ consumerSession.start();
+
+ ClientSession producerSession = cf.createSession();
+ ClientProducer producer = producerSession.createProducer();
+
+ List<String> addresses = new ArrayList<>();
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String address = prefix + "." + RandomUtil.randomString();
+ addresses.add(address);
+ server.addAddressInfo(new AddressInfo(address).setAutoCreated(true));
+ producer.send(address, producerSession.createMessage(false));
+ }
+ producerSession.close();
+
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+ for (String address : addresses) {
+
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
+ Wait.assertEquals(true, () ->
Arrays.asList(server.getPagingManager().getStoreNames()).contains(SimpleString.toSimpleString(address)),
2000, 100);
+ }
+
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+
+ for (String address : addresses) {
+
assertNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
+ Wait.assertEquals(false, () ->
Arrays.asList(server.getPagingManager().getStoreNames()).contains(SimpleString.toSimpleString(address)),
2000, 100);
+ }
+
+ consumerSession.close();
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
index 87c75b844b..e1a7337504 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
@@ -17,13 +17,15 @@
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
@@ -101,4 +103,29 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
assertNotNull(server.locateQueue(queueA));
assertFalse(Wait.waitFor(() -> server.locateQueue(queueA) == null, 5000,
100));
}
+
+ @Test
+ public void testAutoDeleteAutoCreatedQueueWithoutUsage() throws Exception {
+ server.getAddressSettingsRepository().addMatch(addressA.toString(), new
AddressSettings().setAutoDeleteQueuesSkipUsageCheck(true));
+ server.createQueue(new
QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
+ assertNotNull(server.locateQueue(queueA));
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+ Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
+ }
+
+ @Test
+ public void testAutoDeleteAutoCreatedQueueWithoutUsageWithDelay() throws
Exception {
+ final long DELAY = 1500;
+ server.getAddressSettingsRepository().addMatch(addressA.toString(), new
AddressSettings().setAutoDeleteQueuesSkipUsageCheck(true).setAutoDeleteQueuesDelay(DELAY));
+ server.createQueue(new
QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).setAutoCreated(true));
+ long start = System.currentTimeMillis();
+ assertNotNull(server.locateQueue(queueA));
+ while (System.currentTimeMillis() - start <= DELAY) {
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+ assertNotNull(server.locateQueue(queueA));
+ Thread.sleep(100);
+ }
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+ assertNull(server.locateQueue(queueA));
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 5679ae8168..dd09a99461 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -30,6 +30,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
@@ -447,4 +449,45 @@ public class MQTT5Test extends MQTT5TestSupport {
// verify the shared subscription queue is removed after all the
subscribers disconnect
Wait.assertTrue(() ->
server.locateQueue(SUB_NAME.concat(".").concat(TOPIC)) == null, 2000, 100);
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testAutoDeleteAddressWithWildcardSubscription() throws
Exception {
+ String prefix = "topic";
+ server.getAddressSettingsRepository().addMatch(prefix + ".#", new
AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true));
+ String topic = prefix + "/#";
+ final int MESSAGE_COUNT = 100;
+ final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
+
+ MqttClient consumer = createPahoClient("consumer");
+ consumer.connect();
+ consumer.subscribe(topic, AT_LEAST_ONCE);
+ consumer.setCallback(new LatchedMqttCallback(latch));
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+
+ List<String> addresses = new ArrayList<>();
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ String address = prefix + "/" + RandomUtil.randomString();
+ addresses.add(address.replace('/', '.'));
+ producer.publish(address, new MqttMessage());
+ }
+ producer.disconnect();
+ producer.close();
+
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+
+ for (String address : addresses) {
+
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
+ }
+
+ PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl)
server.getPostOffice());
+
+ for (String address : addresses) {
+
assertNull(server.getAddressInfo(SimpleString.toSimpleString(address)));
+ }
+
+ consumer.disconnect();
+ consumer.close();
+ }
}