This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 24521797b2 ARTEMIS-5119 Expired Messages on Cluster SNF should to to
the original Expiry Queue
24521797b2 is described below
commit 24521797b2bd42a06967ab7848e0e5b57de09cb2
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Oct 30 12:49:31 2024 -0400
ARTEMIS-5119 Expired Messages on Cluster SNF should to to the original
Expiry Queue
---
.../artemis/core/server/ActiveMQServer.java | 4 +
.../core/server/impl/ActiveMQServerImpl.java | 25 +++
.../artemis/core/server/impl/QueueImpl.java | 181 +++++++++++----------
.../cluster/expiry/ClusteredExpiryTest.java | 118 ++++++++++++++
.../integration/server/ExpireQueueSuffixTest.java | 101 ++++++++++++
5 files changed, 345 insertions(+), 84 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e491706d80..d35cad4304 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -682,6 +682,10 @@ public interface ActiveMQServer extends ServiceComponent {
return locateQueue(SimpleString.of(queueName));
}
+ default Queue locateQueue(String address, String queue) throws Exception {
+ return null;
+ }
+
default BindingQueryResult bindingQuery(SimpleString address) throws
Exception {
return bindingQuery(address, true);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 287db949c3..8a1956f2b5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -111,6 +111,7 @@ import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -2385,6 +2386,30 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
return (Queue) binding.getBindable();
}
+ @Override
+ public Queue locateQueue(String address, String queue) throws Exception {
+ Bindings bindings =
postOffice.getBindingsForAddress(SimpleString.of(address));
+ if (bindings == null) {
+ return null;
+ }
+
+ Binding binding = bindings.getBinding(queue);
+ if (binding == null) {
+ return null;
+ }
+
+ Bindable bindingContent = binding.getBindable();
+
+ if (!(bindingContent instanceof Queue)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("localQueue({}. {}) found non Queue ( {} ) on binding
table, returning null instead", address, queue, bindingContent);
+ }
+ return null;
+ }
+
+ return (Queue) bindingContent;
+ }
+
@Deprecated
@Override
public Queue deployQueue(final SimpleString address,
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 06876e8dad..b3a57d1af8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -274,7 +274,8 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private final StorageManager storageManager;
- private volatile AddressSettings addressSettings;
+ // Instead of looking up the AddressSettings every time, we cache and
monitor it through onChange
+ private volatile AddressSettings cachedAddressSettings;
private final ActiveMQServer server;
@@ -733,9 +734,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (addressSettingsRepository != null) {
addressSettingsRepositoryListener = new
AddressSettingsRepositoryListener(addressSettingsRepository);
addressSettingsRepository.registerListener(addressSettingsRepositoryListener);
- this.addressSettings =
addressSettingsRepository.getMatch(getAddressSettingsMatch());
+ this.cachedAddressSettings =
addressSettingsRepository.getMatch(getAddressSettingsMatch());
} else {
- this.addressSettings = new AddressSettings();
+ this.cachedAddressSettings = new AddressSettings();
}
if (pageSubscription != null) {
@@ -757,9 +758,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
this.ringSize = queueConfiguration.getRingSize() == null ?
ActiveMQDefaultConfiguration.getDefaultRingSize() :
queueConfiguration.getRingSize();
- this.initialQueueBufferSize =
this.addressSettings.getInitialQueueBufferSize() == null
+ this.initialQueueBufferSize =
this.cachedAddressSettings.getInitialQueueBufferSize() == null
? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE
- : this.addressSettings.getInitialQueueBufferSize();
+ : this.cachedAddressSettings.getInitialQueueBufferSize();
this.intermediateMessageReferences = new
MpscUnboundedArrayQueue<>(initialQueueBufferSize);
}
@@ -2129,36 +2130,92 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
* hence no information about delivering statistics should be
updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer
consumer, boolean delivering) throws Exception {
- if (addressSettings.getExpiryAddress() != null) {
- createExpiryResources();
+ expire(null, ref, consumer, delivering);
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("moving expired reference {} to address = {} from
queue={}", ref, addressSettings.getExpiryAddress(), name);
+ private void expire(final Transaction tx, final MessageReference ref, final
ServerConsumer consumer, boolean delivering) throws Exception {
+ AddressSettings settingsToUse =
getMessageAddressSettings(ref.getMessage());
+ SimpleString expiryAddress = settingsToUse.getExpiryAddress();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("expire on {}/{}, consumer={}, expiryAddress={}",
this.address, this.name, consumer, expiryAddress);
+ }
+
+ if (expiryAddress != null && expiryAddress.length() != 0) {
+ String messageAddress = ref.getMessage().getAddress();
+
+ if (messageAddress == null) {
+ // in the unlikely event where a message does not have an address
stored on the message itself,
+ // we will get the address from the current queue
+ messageAddress = String.valueOf(getAddress());
}
+ createExpiryResources(messageAddress, settingsToUse);
- move(null, addressSettings.getExpiryAddress(), null, ref, false,
AckReason.EXPIRED, consumer, null, delivering);
+ Bindings bindingList =
postOffice.lookupBindingsForAddress(expiryAddress);
+
+ if (bindingList == null || bindingList.getBindings().isEmpty()) {
+ if (!printErrorExpiring) {
+ // print this only once
+
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
+ printErrorExpiring = true;
+ }
+ acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
+ } else {
+ move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED,
consumer, null, delivering);
+ }
} else {
- logger.trace("expiry is null, just acking expired message for
reference {} from queue={}", ref, name);
+ if (!printErrorExpiring) {
+ printErrorExpiring = true;
+ // print this only once
+ ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
+ }
- acknowledge(null, ref, AckReason.EXPIRED, consumer, delivering);
+ acknowledge(tx, ref, AckReason.EXPIRED, consumer, delivering);
}
// potentially auto-delete this queue if this expired the last message
refCountForConsumers.check();
if (server != null && server.hasBrokerMessagePlugins()) {
- server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref,
addressSettings.getExpiryAddress(), consumer));
+ if (tx == null) {
+ server.callBrokerMessagePlugins(plugin ->
plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer));
+ } else {
+ ExpiryLogger expiryLogger = (ExpiryLogger)
tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
+ if (expiryLogger == null) {
+ expiryLogger = new ExpiryLogger();
+ tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER,
expiryLogger);
+ tx.addOperation(expiryLogger);
+ }
+
+ expiryLogger.addExpiry(address, ref);
+
+ // potentially auto-delete this queue if this expired the last
message
+ tx.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ refCountForConsumers.check();
+ }
+ });
+ }
+ }
+ }
+
+ private AddressSettings getMessageAddressSettings(Message message) {
+ if (message.getAddress() == null ||
message.getAddress().equals(String.valueOf(address))) {
+ return cachedAddressSettings;
+ } else {
+ return
server.getAddressSettingsRepository().getMatch(message.getAddress());
}
}
@Override
public SimpleString getExpiryAddress() {
- return this.addressSettings.getExpiryAddress();
+ return this.cachedAddressSettings.getExpiryAddress();
}
@Override
public SimpleString getDeadLetterAddress() {
- return this.addressSettings.getDeadLetterAddress();
+ return this.cachedAddressSettings.getDeadLetterAddress();
}
@Override
@@ -2510,7 +2567,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering(ref);
- expire(tx, ref, true);
+ expire(tx, ref, null, true);
iter.remove();
refRemoved(ref);
count++;
@@ -2543,7 +2600,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
private boolean isExpiryDisabled() {
- final SimpleString expiryAddress = addressSettings.getExpiryAddress();
+ final SimpleString expiryAddress =
cachedAddressSettings.getExpiryAddress();
if (expiryAddress != null && expiryAddress.equals(this.address)) {
// check expire with itself would be silly (waste of time)
logger.trace("Redundant expiration from {} to {}", address,
expiryAddress);
@@ -2637,7 +2694,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
final Transaction tx = new TransactionImpl(storageManager);
for (MessageReference ref : expiredMessages) {
try {
- expire(tx, ref, true);
+ expire(tx, ref, null, true);
refRemoved(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(ref, e);
@@ -3598,23 +3655,23 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
storageManager.updateDeliveryCount(reference);
}
- int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
+ int maxDeliveries = cachedAddressSettings.getMaxDeliveryAttempts();
int deliveryCount = reference.getDeliveryCount();
// First check DLA
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
if (logger.isTraceEnabled()) {
logger.trace("Sending reference {} to DLA = {} since
ref.getDeliveryCount={} and maxDeliveries={} from queue={}",
- reference, addressSettings.getDeadLetterAddress(),
reference.getDeliveryCount(), maxDeliveries, name);
+ reference,
cachedAddressSettings.getDeadLetterAddress(), reference.getDeliveryCount(),
maxDeliveries, name);
}
- boolean dlaResult = sendToDeadLetterAddress(null, reference,
addressSettings.getDeadLetterAddress());
+ boolean dlaResult = sendToDeadLetterAddress(null, reference,
cachedAddressSettings.getDeadLetterAddress());
return new Pair<>(false, dlaResult);
} else {
// Second check Redelivery Delay
- long redeliveryDelay = addressSettings.getRedeliveryDelay();
+ long redeliveryDelay = cachedAddressSettings.getRedeliveryDelay();
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
- redeliveryDelay = calculateRedeliveryDelay(addressSettings,
deliveryCount);
+ redeliveryDelay = calculateRedeliveryDelay(cachedAddressSettings,
deliveryCount);
if (logger.isTraceEnabled()) {
logger.trace("Setting redeliveryDelay={} on reference={}",
redeliveryDelay, reference);
@@ -3896,51 +3953,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
}
- private void expire(final Transaction tx, final MessageReference ref,
boolean delivering) throws Exception {
- SimpleString expiryAddress = addressSettings.getExpiryAddress();
-
- if (expiryAddress != null && expiryAddress.length() != 0) {
-
- createExpiryResources();
-
- Bindings bindingList =
postOffice.lookupBindingsForAddress(expiryAddress);
-
- if (bindingList == null || bindingList.getBindings().isEmpty()) {
-
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
- acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
- } else {
- move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null,
null, delivering);
- }
- } else {
- if (!printErrorExpiring) {
- printErrorExpiring = true;
- // print this only once
- ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
- }
-
- acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
- }
-
- if (server != null && server.hasBrokerMessagePlugins()) {
- ExpiryLogger expiryLogger =
(ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
- if (expiryLogger == null) {
- expiryLogger = new ExpiryLogger();
- tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER,
expiryLogger);
- tx.addOperation(expiryLogger);
- }
-
- expiryLogger.addExpiry(address, ref);
- }
-
- // potentially auto-delete this queue if this expired the last message
- tx.addOperation(new TransactionOperationAbstract() {
- @Override
- public void afterCommit(Transaction tx) {
- refCountForConsumers.check();
- }
- });
- }
-
private class ExpiryLogger extends TransactionOperationAbstract {
List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
@@ -3964,7 +3976,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public boolean sendToDeadLetterAddress(final Transaction tx, final
MessageReference ref) throws Exception {
- return sendToDeadLetterAddress(tx, ref,
addressSettings.getDeadLetterAddress());
+ return sendToDeadLetterAddress(tx, ref,
cachedAddressSettings.getDeadLetterAddress());
}
private boolean sendToDeadLetterAddress(final Transaction tx,
@@ -3999,22 +4011,23 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private void createDeadLetterResources() throws Exception {
AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch(getAddress().toString());
- createResources(addressSettings.isAutoCreateDeadLetterResources(),
addressSettings.getDeadLetterAddress(),
addressSettings.getDeadLetterQueuePrefix(),
addressSettings.getDeadLetterQueueSuffix());
+ createResources(String.valueOf(getAddress()),
addressSettings.isAutoCreateDeadLetterResources(),
addressSettings.getDeadLetterAddress(),
addressSettings.getDeadLetterQueuePrefix(),
addressSettings.getDeadLetterQueueSuffix());
}
- private void createExpiryResources() throws Exception {
- AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch(getAddress().toString());
- createResources(addressSettings.isAutoCreateExpiryResources(),
addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(),
addressSettings.getExpiryQueueSuffix());
+ private void createExpiryResources(String address, AddressSettings
messageAddressSettings) throws Exception {
+ createResources(address,
messageAddressSettings.isAutoCreateExpiryResources(),
messageAddressSettings.getExpiryAddress(),
messageAddressSettings.getExpiryQueuePrefix(),
messageAddressSettings.getExpiryQueueSuffix());
}
- private void createResources(boolean isAutoCreate, SimpleString
destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
- if (isAutoCreate && !getAddress().equals(destinationAddress)) {
+ private void createResources(String address, boolean isAutoCreate,
SimpleString destinationAddress, SimpleString prefix, SimpleString suffix)
throws Exception {
+ if (isAutoCreate && !address.equals(destinationAddress)) {
if (destinationAddress != null && destinationAddress.length() != 0) {
- SimpleString destinationQueueName =
prefix.concat(getAddress()).concat(suffix);
- SimpleString filter = SimpleString.of(String.format("%s = '%s'",
Message.HDR_ORIGINAL_ADDRESS, getAddress()));
+ SimpleString destinationQueueName =
prefix.concat(address).concat(suffix);
+ SimpleString filter = SimpleString.of(String.format("%s = '%s'",
Message.HDR_ORIGINAL_ADDRESS, address));
try {
+ logger.debug("Creating Resource queue {}",
destinationQueueName);
server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true),
true);
} catch (ActiveMQQueueExistsException e) {
+ logger.debug("resource {} already existed, ignoring outcome",
destinationQueueName);
// ignore
}
}
@@ -4770,7 +4783,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
private void configureSlowConsumerReaper() {
- if (addressSettings == null ||
addressSettings.getSlowConsumerThreshold() ==
AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
+ if (cachedAddressSettings == null ||
cachedAddressSettings.getSlowConsumerThreshold() ==
AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
@@ -4780,13 +4793,13 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
} else {
if (slowConsumerReaperRunnable == null) {
- scheduleSlowConsumerReaper(addressSettings);
- } else if (slowConsumerReaperRunnable.checkPeriod !=
addressSettings.getSlowConsumerCheckPeriod() ||
slowConsumerReaperRunnable.thresholdInMsgPerSecond !=
addressSettings.getSlowConsumerThreshold() ||
!slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy()))
{
+ scheduleSlowConsumerReaper(cachedAddressSettings);
+ } else if (slowConsumerReaperRunnable.checkPeriod !=
cachedAddressSettings.getSlowConsumerCheckPeriod() ||
slowConsumerReaperRunnable.thresholdInMsgPerSecond !=
cachedAddressSettings.getSlowConsumerThreshold() ||
!slowConsumerReaperRunnable.policy.equals(cachedAddressSettings.getSlowConsumerPolicy()))
{
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
}
- scheduleSlowConsumerReaper(addressSettings);
+ scheduleSlowConsumerReaper(cachedAddressSettings);
}
}
}
@@ -4847,7 +4860,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public void onChange() {
- addressSettings =
addressSettingsRepository.getMatch(getAddressSettingsMatch());
+ cachedAddressSettings =
addressSettingsRepository.getMatch(getAddressSettingsMatch());
checkDeadLetterAddressAndExpiryAddress();
configureSlowConsumerReaper();
}
@@ -4863,10 +4876,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private void checkDeadLetterAddressAndExpiryAddress() {
if (!Env.isTestEnv() && !internalQueue &&
!address.equals(server.getConfiguration().getManagementNotificationAddress())) {
- if (addressSettings.getDeadLetterAddress() == null) {
+ if (cachedAddressSettings.getDeadLetterAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
}
- if (addressSettings.getExpiryAddress() == null) {
+ if (cachedAddressSettings.getExpiryAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java
new file mode 100644
index 0000000000..1659cd88f9
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Queue snfPaused;
+
+ @Test
+ public void testExpiryOnSNF() throws Exception {
+ setupServer(0, true, true);
+ setupServer(1, true, true);
+
+ setupClusterConnection("cluster0", "queues",
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+ setupClusterConnection("cluster1", "queues",
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+ servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+ startServers(0, 1);
+
+ final String queuesPrefix = "queues.";
+ final String queueName = queuesPrefix + getName();
+ final String expirySuffix = ".Expiry";
+ final String expiryPrefix = "myEXP.";
+ final String expiryAddress = "ExpiryAddress";
+ final String resultingExpiryQueue = expiryPrefix + queueName +
expirySuffix;
+
+ servers[0].getAddressSettingsRepository().clear();
+ servers[0].getAddressSettingsRepository().addMatch(queuesPrefix + "#",
new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress)));
+ servers[0].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ servers[1].getAddressSettingsRepository().clear();
+ servers[1].getAddressSettingsRepository().addMatch(queuesPrefix + "#",
new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress)));
+ servers[1].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ Queue serverQueue0 =
servers[0].createQueue(QueueConfiguration.of(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
servers[1].createQueue(QueueConfiguration.of(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+ waitForBindings(0, queueName, 1, 0, true);
+ waitForBindings(1, queueName, 1, 0, true);
+
+ waitForBindings(0, queueName, 1, 0, false);
+ waitForBindings(1, queueName, 1, 0, false);
+
+ // pausing the SNF queue to keep messages stuck on the queue
+ servers[0].getPostOffice().getAllBindings().filter(f ->
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+ assertNotNull(snfPaused);
+
+ long NUMBER_OF_MESSAGES = 100;
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session1 = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session1.createProducer(session1.createQueue(queueName));
+ producer.setTimeToLive(100);
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session1.createTextMessage("hello"));
+ }
+ session1.commit();
+ }
+ Wait.assertEquals(0L, serverQueue0::getMessageCount, 5_000, 100);
+ Wait.assertEquals(0L, snfPaused::getMessageCount, 5_000, 100);
+ Queue expiryQueue = servers[0].locateQueue(expiryAddress,
resultingExpiryQueue);
+ assertNotNull(expiryQueue);
+ Wait.assertEquals(NUMBER_OF_MESSAGES, expiryQueue::getMessageCount,
5_000, 100);
+
+ }
+
+ private void pauseQueue(Binding binding) {
+ assertNull(snfPaused);
+ if (binding instanceof LocalQueueBinding) {
+ logger.info("Pausing {}", binding.getUniqueName());
+ snfPaused = ((LocalQueueBinding) binding).getQueue();
+ snfPaused.pause();
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java
new file mode 100644
index 0000000000..9193f7f64e
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ExpireQueueSuffixTest extends ActiveMQTestBase {
+
+ public final SimpleString queueA = SimpleString.of("queueA");
+ public final SimpleString queueB = SimpleString.of("queueB");
+ public final SimpleString expiryAddress = SimpleString.of("myExpiry");
+
+ public final SimpleString expirySuffix = SimpleString.of(".expSuffix");
+ public final long EXPIRY_DELAY = 10L;
+
+ private ActiveMQServer server;
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+
server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L);
+
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix));
+
+ server.start();
+
+
server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST));
+
server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ @Test
+ public void testAutoCreationOfExpiryResources() throws Exception {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ long sendA = 7;
+ long sendB = 11;
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueA.toString()));
+ producer.setTimeToLive(100);
+
+ for (int i = 0; i < sendA; i++) {
+ producer.send(session.createTextMessage("queueA"));
+ }
+ session.commit();
+
+ producer =
session.createProducer(session.createQueue(queueB.toString()));
+ producer.setTimeToLive(100);
+ for (int i = 0; i < sendB; i++) {
+ producer.send(session.createTextMessage("queueB"));
+ }
+ session.commit();
+ }
+
+ Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." +
queueA + expirySuffix) != null, 5000);
+ Queue expA = server.locateQueue(expiryAddress.toString(), "EXP." +
queueA + expirySuffix);
+ assertNotNull(expA);
+
+ Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." +
queueB + expirySuffix) != null, 5000);
+ Queue expB = server.locateQueue(expiryAddress.toString(), "EXP." +
queueB + expirySuffix);
+ assertNotNull(expB);
+
+ Wait.assertEquals(sendA, expA::getMessageCount, 5000, 100);
+ Wait.assertEquals(sendB, expB::getMessageCount, 5000, 100);
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact