gemmellr commented on code in PR #5327:
URL: https://github.com/apache/activemq-artemis/pull/5327#discussion_r1825666789


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final 
ServerConsumer consumer, bo
       refCountForConsumers.check();
 
       if (server != null && server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
addressSettings.getExpiryAddress(), consumer));
+         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
settingsToUse.getExpiryAddress(), consumer));
+      }
+   }
+
+
+   AddressSettings getMessageAddressSettings(Message message) {
+      if (message.getAddress().equals(String.valueOf(address))) {
+         return addressSettings;
+      } else {
+         return 
server.getAddressSettingsRepository().getMatch(message.getAddress());
       }
    }
 
+   private void expire(final Transaction tx, final MessageReference ref, 
boolean delivering) throws Exception {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Expiry on {}, expiryAddress={}", this.address, 
addressSettings.getExpiryAddress());
+      }
+
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());
+      SimpleString expiryAddress = settingsToUse.getExpiryAddress();

Review Comment:
   Similar comment to other expire method about logging one thing then 
immediately doing another.
   
   Would the other method be clearer if it also created an _expiryAddress_ 
variable like this method?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Queue snfPaused;
+
+   @Test
+   public void testExpiryOnSNF() throws Exception {
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+      servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+      startServers(0, 1);
+
+      servers[0].getAddressSettingsRepository().clear();
+      servers[0].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+      servers[0].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      servers[1].getAddressSettingsRepository().clear();
+      servers[1].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+      servers[1].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      Queue serverQueue0 = 
servers[0].createQueue(QueueConfiguration.of("queues." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of("queues." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[0].createQueue(QueueConfiguration.of("Expiry" + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of("Expiry." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      waitForBindings(0, "queues." + getName(), 1, 0, true);
+      waitForBindings(1, "queues." + getName(), 1, 0, true);
+
+      waitForBindings(0, "queues." + getName(), 1, 0, false);
+      waitForBindings(1, "queues." + getName(), 1, 0, false);
+
+      // pausing the SNF queue to keep messages stuck on the queue
+      servers[0].getPostOffice().getAllBindings().filter(f -> 
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+      assertNotNull(snfPaused);
+
+      long NUMBER_OF_MESSAGES = 100;
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+      ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61617");

Review Comment:
   Is factory2 unused as it looks?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final 
ServerConsumer consumer, bo
       refCountForConsumers.check();
 
       if (server != null && server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
addressSettings.getExpiryAddress(), consumer));
+         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
settingsToUse.getExpiryAddress(), consumer));
+      }
+   }
+
+
+   AddressSettings getMessageAddressSettings(Message message) {
+      if (message.getAddress().equals(String.valueOf(address))) {
+         return addressSettings;
+      } else {
+         return 
server.getAddressSettingsRepository().getMatch(message.getAddress());
       }
    }
 
+   private void expire(final Transaction tx, final MessageReference ref, 
boolean delivering) throws Exception {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Expiry on {}, expiryAddress={}", this.address, 
addressSettings.getExpiryAddress());
+      }
+
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());
+      SimpleString expiryAddress = settingsToUse.getExpiryAddress();
+
+      if (expiryAddress != null && expiryAddress.length() != 0) {

Review Comment:
   Seems strange to length-check it here but not in the check above in the 
other expire method?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Queue snfPaused;
+
+   @Test
+   public void testExpiryOnSNF() throws Exception {
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+      servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+      startServers(0, 1);
+
+      servers[0].getAddressSettingsRepository().clear();
+      servers[0].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+      servers[0].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      servers[1].getAddressSettingsRepository().clear();
+      servers[1].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+      servers[1].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      Queue serverQueue0 = 
servers[0].createQueue(QueueConfiguration.of("queues." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of("queues." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[0].createQueue(QueueConfiguration.of("Expiry" + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of("Expiry." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      waitForBindings(0, "queues." + getName(), 1, 0, true);
+      waitForBindings(1, "queues." + getName(), 1, 0, true);
+
+      waitForBindings(0, "queues." + getName(), 1, 0, false);
+      waitForBindings(1, "queues." + getName(), 1, 0, false);
+
+      // pausing the SNF queue to keep messages stuck on the queue
+      servers[0].getPostOffice().getAllBindings().filter(f -> 
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+      assertNotNull(snfPaused);
+
+      long NUMBER_OF_MESSAGES = 100;
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+      ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61617");
+      try (Connection connection = factory.createConnection()) {
+         Session session1 = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session1.createProducer(session1.createQueue("queues." + getName()));
+         producer.setTimeToLive(2_000);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session1.createTextMessage("hello"));
+         }
+         session1.commit();
+      }
+      Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100);
+      Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100);
+      Queue expiryQueue = servers[0].locateQueue("Expiry", "EXP.queues." + 
getName() + ".Expiry");

Review Comment:
   It would be a lot easier to follow this test if there were variables for the 
prefix, suffix, and 'main queue name' values that were then used to set up the 
configuration and composed for the other usages such as this (maybe via another 
variable).  Especially given the slightly overloaded use of "Expiry" in many of 
them. 



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws 
Exception {
     *           hence no information about delivering statistics should be 
updated. */
    @Override
    public void expire(final MessageReference ref, final ServerConsumer 
consumer, boolean delivering) throws Exception {
-      if (addressSettings.getExpiryAddress() != null) {
-         createExpiryResources();
+      if (logger.isDebugEnabled()) {
+         logger.debug("Expiry on {}, expiryAddress={}", this.address, 
addressSettings.getExpiryAddress());
+      }

Review Comment:
   Would including the queue name be helpful to be clear on which queue is 
logging this (and especially given the Jira, it seems likely to be different 
than may initially be expected)?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws 
Exception {
     *           hence no information about delivering statistics should be 
updated. */
    @Override
    public void expire(final MessageReference ref, final ServerConsumer 
consumer, boolean delivering) throws Exception {
-      if (addressSettings.getExpiryAddress() != null) {
-         createExpiryResources();
+      if (logger.isDebugEnabled()) {
+         logger.debug("Expiry on {}, expiryAddress={}", this.address, 
addressSettings.getExpiryAddress());
+      }
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());
+      if (settingsToUse.getExpiryAddress() != null) {
+         createExpiryResources(ref.getMessage().getAddress(), settingsToUse);
 
          if (logger.isTraceEnabled()) {
             logger.trace("moving expired reference {} to address = {} from 
queue={}", ref, addressSettings.getExpiryAddress(), name);
          }
 
-         move(null, addressSettings.getExpiryAddress(), null, ref, false, 
AckReason.EXPIRED, consumer, null, delivering);
+         move(null, settingsToUse.getExpiryAddress(), null, ref, false, 
AckReason.EXPIRED, consumer, null, delivering);

Review Comment:
   This potentially changed the expiry address being used, but the trace log 
above it didnt change and is still using the other settings, so it is 
potentially logging something different than what is being done here.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws 
Exception {
     *           hence no information about delivering statistics should be 
updated. */
    @Override
    public void expire(final MessageReference ref, final ServerConsumer 
consumer, boolean delivering) throws Exception {
-      if (addressSettings.getExpiryAddress() != null) {
-         createExpiryResources();
+      if (logger.isDebugEnabled()) {
+         logger.debug("Expiry on {}, expiryAddress={}", this.address, 
addressSettings.getExpiryAddress());
+      }
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());

Review Comment:
   This debug logged about expiry on one expiry address value above, but then 
immediately potentially used a different expiry address here, one which it 
doesn't log. Seems like it might be good to log the actual expiry address used 
if it just changed?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ExpireQueueSuffixTest extends ActiveMQTestBase {
+
+   public final SimpleString queueA = SimpleString.of("queueA");
+   public final SimpleString queueB = SimpleString.of("queueB");
+   public final SimpleString expiryAddress = SimpleString.of("myExpiry");
+
+   public final SimpleString expirySuffix = SimpleString.of(".expSuffix");
+   public final long EXPIRY_DELAY = 10L;
+
+   private ActiveMQServer server;
+
+   @Override
+   @BeforeEach
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      
server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L);
+
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix));
+
+      server.start();
+
+      
server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST));
+      
server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST));
+   }
+
+   @Test
+   public void testAutoCreationOfExpiryResources() throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+      long sendA = 7;
+      long sendB = 11;
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(queueA.toString()));
+         producer.setTimeToLive(100);
+
+         for (int i = 0; i < sendA; i++) {
+            producer.send(session.createTextMessage("queueA"));
+         }
+         session.commit();
+
+         producer = 
session.createProducer(session.createQueue(queueB.toString()));
+         producer.setTimeToLive(100);
+         for (int i = 0; i < sendB; i++) {
+            producer.send(session.createTextMessage("queueB"));
+         }
+         session.commit();
+      }
+
+      Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + 
queueA + expirySuffix) != null, 5000);

Review Comment:
   Hehe, yes, on my earlier comment about using variables for 
clarity...something a lot like this. Could have saved myself some typing by 
reading ahead and referencing this hehe.
   
   (A variable could perhaps be added for this whole name to make it easier to 
follow)



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Queue snfPaused;
+
+   @Test
+   public void testExpiryOnSNF() throws Exception {
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+      servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+      startServers(0, 1);
+
+      servers[0].getAddressSettingsRepository().clear();
+      servers[0].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+      servers[0].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      servers[1].getAddressSettingsRepository().clear();
+      servers[1].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+      servers[1].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      Queue serverQueue0 = 
servers[0].createQueue(QueueConfiguration.of("queues." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of("queues." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[0].createQueue(QueueConfiguration.of("Expiry" + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of("Expiry." + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));

Review Comment:
   Given the earlier address settings look the same for both servers, is it 
expected these 2 queue creations should be subtly different? If so maybe a 
comment why?
   
   Why are they created outside either of the settings prefixes that were just 
configured? The sends seem to go to different queues, and the asserts are done 
on different queues. What are these created and/or used for? Was there meant to 
be an assert that these didnt end up with any messages?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to