This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 24521797b2 ARTEMIS-5119 Expired Messages on Cluster SNF should to to 
the original Expiry Queue
24521797b2 is described below

commit 24521797b2bd42a06967ab7848e0e5b57de09cb2
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Oct 30 12:49:31 2024 -0400

    ARTEMIS-5119 Expired Messages on Cluster SNF should to to the original 
Expiry Queue
---
 .../artemis/core/server/ActiveMQServer.java        |   4 +
 .../core/server/impl/ActiveMQServerImpl.java       |  25 +++
 .../artemis/core/server/impl/QueueImpl.java        | 181 +++++++++++----------
 .../cluster/expiry/ClusteredExpiryTest.java        | 118 ++++++++++++++
 .../integration/server/ExpireQueueSuffixTest.java  | 101 ++++++++++++
 5 files changed, 345 insertions(+), 84 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e491706d80..d35cad4304 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -682,6 +682,10 @@ public interface ActiveMQServer extends ServiceComponent {
       return locateQueue(SimpleString.of(queueName));
    }
 
+   default Queue locateQueue(String address, String queue) throws Exception {
+      return null;
+   }
+
    default BindingQueryResult bindingQuery(SimpleString address) throws 
Exception {
       return bindingQuery(address, true);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 287db949c3..8a1956f2b5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -111,6 +111,7 @@ import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
 import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -2385,6 +2386,30 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
       return (Queue) binding.getBindable();
    }
 
+   @Override
+   public Queue locateQueue(String address, String queue) throws Exception {
+      Bindings bindings = 
postOffice.getBindingsForAddress(SimpleString.of(address));
+      if (bindings == null) {
+         return null;
+      }
+
+      Binding binding = bindings.getBinding(queue);
+      if (binding == null) {
+         return null;
+      }
+
+      Bindable bindingContent = binding.getBindable();
+
+      if (!(bindingContent instanceof Queue)) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("localQueue({}. {}) found non Queue ( {} ) on binding 
table, returning null instead", address, queue, bindingContent);
+         }
+         return null;
+      }
+
+      return (Queue) bindingContent;
+   }
+
    @Deprecated
    @Override
    public Queue deployQueue(final SimpleString address,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 06876e8dad..b3a57d1af8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -274,7 +274,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private final StorageManager storageManager;
 
-   private volatile AddressSettings addressSettings;
+   // Instead of looking up the AddressSettings every time, we cache and 
monitor it through onChange
+   private volatile AddressSettings cachedAddressSettings;
 
    private final ActiveMQServer server;
 
@@ -733,9 +734,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       if (addressSettingsRepository != null) {
          addressSettingsRepositoryListener = new 
AddressSettingsRepositoryListener(addressSettingsRepository);
          
addressSettingsRepository.registerListener(addressSettingsRepositoryListener);
-         this.addressSettings = 
addressSettingsRepository.getMatch(getAddressSettingsMatch());
+         this.cachedAddressSettings = 
addressSettingsRepository.getMatch(getAddressSettingsMatch());
       } else {
-         this.addressSettings = new AddressSettings();
+         this.cachedAddressSettings = new AddressSettings();
       }
 
       if (pageSubscription != null) {
@@ -757,9 +758,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       this.ringSize = queueConfiguration.getRingSize() == null ? 
ActiveMQDefaultConfiguration.getDefaultRingSize() : 
queueConfiguration.getRingSize();
 
-      this.initialQueueBufferSize = 
this.addressSettings.getInitialQueueBufferSize() == null
+      this.initialQueueBufferSize = 
this.cachedAddressSettings.getInitialQueueBufferSize() == null
               ? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE
-              : this.addressSettings.getInitialQueueBufferSize();
+              : this.cachedAddressSettings.getInitialQueueBufferSize();
       this.intermediateMessageReferences = new 
MpscUnboundedArrayQueue<>(initialQueueBufferSize);
    }
 
@@ -2129,36 +2130,92 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
     *           hence no information about delivering statistics should be 
updated. */
    @Override
    public void expire(final MessageReference ref, final ServerConsumer 
consumer, boolean delivering) throws Exception {
-      if (addressSettings.getExpiryAddress() != null) {
-         createExpiryResources();
+      expire(null, ref, consumer, delivering);
+   }
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("moving expired reference {} to address = {} from 
queue={}", ref, addressSettings.getExpiryAddress(), name);
+   private void expire(final Transaction tx, final MessageReference ref, final 
ServerConsumer consumer, boolean delivering) throws Exception {
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());
+      SimpleString expiryAddress = settingsToUse.getExpiryAddress();
+
+      if (logger.isDebugEnabled()) {
+         logger.debug("expire on {}/{}, consumer={}, expiryAddress={}", 
this.address, this.name, consumer, expiryAddress);
+      }
+
+      if (expiryAddress != null && expiryAddress.length() != 0) {
+         String messageAddress = ref.getMessage().getAddress();
+
+         if (messageAddress == null) {
+            // in the unlikely event where a message does not have an address 
stored on the message itself,
+            // we will get the address from the current queue
+            messageAddress = String.valueOf(getAddress());
          }
+         createExpiryResources(messageAddress, settingsToUse);
 
-         move(null, addressSettings.getExpiryAddress(), null, ref, false, 
AckReason.EXPIRED, consumer, null, delivering);
+         Bindings bindingList = 
postOffice.lookupBindingsForAddress(expiryAddress);
+
+         if (bindingList == null || bindingList.getBindings().isEmpty()) {
+            if (!printErrorExpiring) {
+               // print this only once
+               
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
+               printErrorExpiring = true;
+            }
+            acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
+         } else {
+            move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, 
consumer, null, delivering);
+         }
       } else {
-         logger.trace("expiry is null, just acking expired message for 
reference {} from queue={}", ref, name);
+         if (!printErrorExpiring) {
+            printErrorExpiring = true;
+            // print this only once
+            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
+         }
 
-         acknowledge(null, ref, AckReason.EXPIRED, consumer, delivering);
+         acknowledge(tx, ref, AckReason.EXPIRED, consumer, delivering);
       }
 
       // potentially auto-delete this queue if this expired the last message
       refCountForConsumers.check();
 
       if (server != null && server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
addressSettings.getExpiryAddress(), consumer));
+         if (tx == null) {
+            server.callBrokerMessagePlugins(plugin -> 
plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer));
+         } else {
+            ExpiryLogger expiryLogger = (ExpiryLogger) 
tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
+            if (expiryLogger == null) {
+               expiryLogger = new ExpiryLogger();
+               tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, 
expiryLogger);
+               tx.addOperation(expiryLogger);
+            }
+
+            expiryLogger.addExpiry(address, ref);
+
+            // potentially auto-delete this queue if this expired the last 
message
+            tx.addOperation(new TransactionOperationAbstract() {
+               @Override
+               public void afterCommit(Transaction tx) {
+                  refCountForConsumers.check();
+               }
+            });
+         }
+      }
+   }
+
+   private AddressSettings getMessageAddressSettings(Message message) {
+      if (message.getAddress() == null || 
message.getAddress().equals(String.valueOf(address))) {
+         return cachedAddressSettings;
+      } else {
+         return 
server.getAddressSettingsRepository().getMatch(message.getAddress());
       }
    }
 
    @Override
    public SimpleString getExpiryAddress() {
-      return this.addressSettings.getExpiryAddress();
+      return this.cachedAddressSettings.getExpiryAddress();
    }
 
    @Override
    public SimpleString getDeadLetterAddress() {
-      return this.addressSettings.getDeadLetterAddress();
+      return this.cachedAddressSettings.getDeadLetterAddress();
    }
 
    @Override
@@ -2510,7 +2567,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             MessageReference ref = iter.next();
             if (filter == null || filter.match(ref.getMessage())) {
                incDelivering(ref);
-               expire(tx, ref, true);
+               expire(tx, ref, null, true);
                iter.remove();
                refRemoved(ref);
                count++;
@@ -2543,7 +2600,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    private boolean isExpiryDisabled() {
-      final SimpleString expiryAddress = addressSettings.getExpiryAddress();
+      final SimpleString expiryAddress = 
cachedAddressSettings.getExpiryAddress();
       if (expiryAddress != null && expiryAddress.equals(this.address)) {
          // check expire with itself would be silly (waste of time)
          logger.trace("Redundant expiration from {} to {}", address, 
expiryAddress);
@@ -2637,7 +2694,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             final Transaction tx = new TransactionImpl(storageManager);
             for (MessageReference ref : expiredMessages) {
                try {
-                  expire(tx, ref, true);
+                  expire(tx, ref, null, true);
                   refRemoved(ref);
                } catch (Exception e) {
                   
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(ref, e);
@@ -3598,23 +3655,23 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          storageManager.updateDeliveryCount(reference);
       }
 
-      int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
+      int maxDeliveries = cachedAddressSettings.getMaxDeliveryAttempts();
       int deliveryCount = reference.getDeliveryCount();
 
       // First check DLA
       if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
          if (logger.isTraceEnabled()) {
             logger.trace("Sending reference {} to DLA = {} since 
ref.getDeliveryCount={} and maxDeliveries={} from queue={}",
-                         reference, addressSettings.getDeadLetterAddress(), 
reference.getDeliveryCount(), maxDeliveries, name);
+                         reference, 
cachedAddressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), 
maxDeliveries, name);
          }
-         boolean dlaResult = sendToDeadLetterAddress(null, reference, 
addressSettings.getDeadLetterAddress());
+         boolean dlaResult = sendToDeadLetterAddress(null, reference, 
cachedAddressSettings.getDeadLetterAddress());
 
          return new Pair<>(false, dlaResult);
       } else {
          // Second check Redelivery Delay
-         long redeliveryDelay = addressSettings.getRedeliveryDelay();
+         long redeliveryDelay = cachedAddressSettings.getRedeliveryDelay();
          if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
-            redeliveryDelay = calculateRedeliveryDelay(addressSettings, 
deliveryCount);
+            redeliveryDelay = calculateRedeliveryDelay(cachedAddressSettings, 
deliveryCount);
 
             if (logger.isTraceEnabled()) {
                logger.trace("Setting redeliveryDelay={} on reference={}", 
redeliveryDelay, reference);
@@ -3896,51 +3953,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
    }
 
-   private void expire(final Transaction tx, final MessageReference ref, 
boolean delivering) throws Exception {
-      SimpleString expiryAddress = addressSettings.getExpiryAddress();
-
-      if (expiryAddress != null && expiryAddress.length() != 0) {
-
-         createExpiryResources();
-
-         Bindings bindingList = 
postOffice.lookupBindingsForAddress(expiryAddress);
-
-         if (bindingList == null || bindingList.getBindings().isEmpty()) {
-            
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
-            acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
-         } else {
-            move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, 
null, delivering);
-         }
-      } else {
-         if (!printErrorExpiring) {
-            printErrorExpiring = true;
-            // print this only once
-            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
-         }
-
-         acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
-      }
-
-      if (server != null && server.hasBrokerMessagePlugins()) {
-         ExpiryLogger expiryLogger = 
(ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
-         if (expiryLogger == null) {
-            expiryLogger = new ExpiryLogger();
-            tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, 
expiryLogger);
-            tx.addOperation(expiryLogger);
-         }
-
-         expiryLogger.addExpiry(address, ref);
-      }
-
-      // potentially auto-delete this queue if this expired the last message
-      tx.addOperation(new TransactionOperationAbstract() {
-         @Override
-         public void afterCommit(Transaction tx) {
-            refCountForConsumers.check();
-         }
-      });
-   }
-
    private class ExpiryLogger extends TransactionOperationAbstract {
 
       List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
@@ -3964,7 +3976,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    @Override
    public boolean sendToDeadLetterAddress(final Transaction tx, final 
MessageReference ref) throws Exception {
-      return sendToDeadLetterAddress(tx, ref, 
addressSettings.getDeadLetterAddress());
+      return sendToDeadLetterAddress(tx, ref, 
cachedAddressSettings.getDeadLetterAddress());
    }
 
    private boolean sendToDeadLetterAddress(final Transaction tx,
@@ -3999,22 +4011,23 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private void createDeadLetterResources() throws Exception {
       AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(getAddress().toString());
-      createResources(addressSettings.isAutoCreateDeadLetterResources(), 
addressSettings.getDeadLetterAddress(), 
addressSettings.getDeadLetterQueuePrefix(), 
addressSettings.getDeadLetterQueueSuffix());
+      createResources(String.valueOf(getAddress()), 
addressSettings.isAutoCreateDeadLetterResources(), 
addressSettings.getDeadLetterAddress(), 
addressSettings.getDeadLetterQueuePrefix(), 
addressSettings.getDeadLetterQueueSuffix());
    }
 
-   private void createExpiryResources() throws Exception {
-      AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(getAddress().toString());
-      createResources(addressSettings.isAutoCreateExpiryResources(), 
addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), 
addressSettings.getExpiryQueueSuffix());
+   private void createExpiryResources(String address, AddressSettings 
messageAddressSettings) throws Exception {
+      createResources(address, 
messageAddressSettings.isAutoCreateExpiryResources(), 
messageAddressSettings.getExpiryAddress(), 
messageAddressSettings.getExpiryQueuePrefix(), 
messageAddressSettings.getExpiryQueueSuffix());
    }
 
-   private void createResources(boolean isAutoCreate, SimpleString 
destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
-      if (isAutoCreate && !getAddress().equals(destinationAddress)) {
+   private void createResources(String address, boolean isAutoCreate, 
SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) 
throws Exception {
+      if (isAutoCreate && !address.equals(destinationAddress)) {
          if (destinationAddress != null && destinationAddress.length() != 0) {
-            SimpleString destinationQueueName = 
prefix.concat(getAddress()).concat(suffix);
-            SimpleString filter = SimpleString.of(String.format("%s = '%s'", 
Message.HDR_ORIGINAL_ADDRESS, getAddress()));
+            SimpleString destinationQueueName = 
prefix.concat(address).concat(suffix);
+            SimpleString filter = SimpleString.of(String.format("%s = '%s'", 
Message.HDR_ORIGINAL_ADDRESS, address));
             try {
+               logger.debug("Creating Resource queue {}", 
destinationQueueName);
                
server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true),
 true);
             } catch (ActiveMQQueueExistsException e) {
+               logger.debug("resource {} already existed, ignoring outcome", 
destinationQueueName);
                // ignore
             }
          }
@@ -4770,7 +4783,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    private void configureSlowConsumerReaper() {
-      if (addressSettings == null || 
addressSettings.getSlowConsumerThreshold() == 
AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
+      if (cachedAddressSettings == null || 
cachedAddressSettings.getSlowConsumerThreshold() == 
AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
          if (slowConsumerReaperFuture != null) {
             slowConsumerReaperFuture.cancel(false);
             slowConsumerReaperFuture = null;
@@ -4780,13 +4793,13 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
       } else {
          if (slowConsumerReaperRunnable == null) {
-            scheduleSlowConsumerReaper(addressSettings);
-         } else if (slowConsumerReaperRunnable.checkPeriod != 
addressSettings.getSlowConsumerCheckPeriod() || 
slowConsumerReaperRunnable.thresholdInMsgPerSecond != 
addressSettings.getSlowConsumerThreshold() || 
!slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy()))
 {
+            scheduleSlowConsumerReaper(cachedAddressSettings);
+         } else if (slowConsumerReaperRunnable.checkPeriod != 
cachedAddressSettings.getSlowConsumerCheckPeriod() || 
slowConsumerReaperRunnable.thresholdInMsgPerSecond != 
cachedAddressSettings.getSlowConsumerThreshold() || 
!slowConsumerReaperRunnable.policy.equals(cachedAddressSettings.getSlowConsumerPolicy()))
 {
             if (slowConsumerReaperFuture != null) {
                slowConsumerReaperFuture.cancel(false);
                slowConsumerReaperFuture = null;
             }
-            scheduleSlowConsumerReaper(addressSettings);
+            scheduleSlowConsumerReaper(cachedAddressSettings);
          }
       }
    }
@@ -4847,7 +4860,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       @Override
       public void onChange() {
-         addressSettings = 
addressSettingsRepository.getMatch(getAddressSettingsMatch());
+         cachedAddressSettings = 
addressSettingsRepository.getMatch(getAddressSettingsMatch());
          checkDeadLetterAddressAndExpiryAddress();
          configureSlowConsumerReaper();
       }
@@ -4863,10 +4876,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private void checkDeadLetterAddressAndExpiryAddress() {
       if (!Env.isTestEnv() && !internalQueue && 
!address.equals(server.getConfiguration().getManagementNotificationAddress())) {
-         if (addressSettings.getDeadLetterAddress() == null) {
+         if (cachedAddressSettings.getDeadLetterAddress() == null) {
             ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
          }
-         if (addressSettings.getExpiryAddress() == null) {
+         if (cachedAddressSettings.getExpiryAddress() == null) {
             ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
          }
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java
new file mode 100644
index 0000000000..1659cd88f9
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Queue snfPaused;
+
+   @Test
+   public void testExpiryOnSNF() throws Exception {
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+      servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+      startServers(0, 1);
+
+      final String queuesPrefix = "queues.";
+      final String queueName = queuesPrefix + getName();
+      final String expirySuffix = ".Expiry";
+      final String expiryPrefix = "myEXP.";
+      final String expiryAddress = "ExpiryAddress";
+      final String resultingExpiryQueue = expiryPrefix + queueName + 
expirySuffix;
+
+      servers[0].getAddressSettingsRepository().clear();
+      servers[0].getAddressSettingsRepository().addMatch(queuesPrefix + "#", 
new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress)));
+      servers[0].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      servers[1].getAddressSettingsRepository().clear();
+      servers[1].getAddressSettingsRepository().addMatch(queuesPrefix + "#", 
new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress)));
+      servers[1].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      Queue serverQueue0 = 
servers[0].createQueue(QueueConfiguration.of(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      
servers[1].createQueue(QueueConfiguration.of(queueName).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      waitForBindings(0, queueName, 1, 0, true);
+      waitForBindings(1, queueName, 1, 0, true);
+
+      waitForBindings(0, queueName, 1, 0, false);
+      waitForBindings(1, queueName, 1, 0, false);
+
+      // pausing the SNF queue to keep messages stuck on the queue
+      servers[0].getPostOffice().getAllBindings().filter(f -> 
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+      assertNotNull(snfPaused);
+
+      long NUMBER_OF_MESSAGES = 100;
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session1 = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session1.createProducer(session1.createQueue(queueName));
+         producer.setTimeToLive(100);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session1.createTextMessage("hello"));
+         }
+         session1.commit();
+      }
+      Wait.assertEquals(0L, serverQueue0::getMessageCount, 5_000, 100);
+      Wait.assertEquals(0L, snfPaused::getMessageCount, 5_000, 100);
+      Queue expiryQueue = servers[0].locateQueue(expiryAddress, 
resultingExpiryQueue);
+      assertNotNull(expiryQueue);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, expiryQueue::getMessageCount, 
5_000, 100);
+
+   }
+
+   private void pauseQueue(Binding binding) {
+      assertNull(snfPaused);
+      if (binding instanceof LocalQueueBinding) {
+         logger.info("Pausing {}", binding.getUniqueName());
+         snfPaused = ((LocalQueueBinding) binding).getQueue();
+         snfPaused.pause();
+      }
+   }
+
+}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java
new file mode 100644
index 0000000000..9193f7f64e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ExpireQueueSuffixTest extends ActiveMQTestBase {
+
+   public final SimpleString queueA = SimpleString.of("queueA");
+   public final SimpleString queueB = SimpleString.of("queueB");
+   public final SimpleString expiryAddress = SimpleString.of("myExpiry");
+
+   public final SimpleString expirySuffix = SimpleString.of(".expSuffix");
+   public final long EXPIRY_DELAY = 10L;
+
+   private ActiveMQServer server;
+
+   @Override
+   @BeforeEach
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      
server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L);
+
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix));
+
+      server.start();
+
+      
server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST));
+      
server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST));
+   }
+
+   @Test
+   public void testAutoCreationOfExpiryResources() throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+      long sendA = 7;
+      long sendB = 11;
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(queueA.toString()));
+         producer.setTimeToLive(100);
+
+         for (int i = 0; i < sendA; i++) {
+            producer.send(session.createTextMessage("queueA"));
+         }
+         session.commit();
+
+         producer = 
session.createProducer(session.createQueue(queueB.toString()));
+         producer.setTimeToLive(100);
+         for (int i = 0; i < sendB; i++) {
+            producer.send(session.createTextMessage("queueB"));
+         }
+         session.commit();
+      }
+
+      Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + 
queueA + expirySuffix) != null, 5000);
+      Queue expA = server.locateQueue(expiryAddress.toString(), "EXP." + 
queueA + expirySuffix);
+      assertNotNull(expA);
+
+      Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + 
queueB + expirySuffix) != null, 5000);
+      Queue expB = server.locateQueue(expiryAddress.toString(), "EXP." + 
queueB + expirySuffix);
+      assertNotNull(expB);
+
+      Wait.assertEquals(sendA, expA::getMessageCount, 5000, 100);
+      Wait.assertEquals(sendB, expB::getMessageCount, 5000, 100);
+   }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to