gemmellr commented on code in PR #5327: URL: https://github.com/apache/activemq-artemis/pull/5327#discussion_r1825666789
########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final ServerConsumer consumer, bo refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer)); + } + } + + + AddressSettings getMessageAddressSettings(Message message) { + if (message.getAddress().equals(String.valueOf(address))) { + return addressSettings; + } else { + return server.getAddressSettingsRepository().getMatch(message.getAddress()); } } + private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } + + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + SimpleString expiryAddress = settingsToUse.getExpiryAddress(); Review Comment: Similar comment to other expire method about logging one thing then immediately doing another. Would the other method be clearer if it also created an _expiryAddress_ variable like this method? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + servers[0].getAddressSettingsRepository().clear(); + servers[0].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + servers[1].getAddressSettingsRepository().clear(); + servers[1].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[0].createQueue(QueueConfiguration.of("Expiry" + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("Expiry." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + waitForBindings(0, "queues." + getName(), 1, 0, true); + waitForBindings(1, "queues." + getName(), 1, 0, true); + + waitForBindings(0, "queues." + getName(), 1, 0, false); + waitForBindings(1, "queues." + getName(), 1, 0, false); + + // pausing the SNF queue to keep messages stuck on the queue + servers[0].getPostOffice().getAllBindings().filter(f -> f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue); + assertNotNull(snfPaused); + + long NUMBER_OF_MESSAGES = 100; + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61617"); Review Comment: Is factory2 unused as it looks? ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final ServerConsumer consumer, bo refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer)); + } + } + + + AddressSettings getMessageAddressSettings(Message message) { + if (message.getAddress().equals(String.valueOf(address))) { + return addressSettings; + } else { + return server.getAddressSettingsRepository().getMatch(message.getAddress()); } } + private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } + + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + SimpleString expiryAddress = settingsToUse.getExpiryAddress(); + + if (expiryAddress != null && expiryAddress.length() != 0) { Review Comment: Seems strange to length-check it here but not in the check above in the other expire method? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + servers[0].getAddressSettingsRepository().clear(); + servers[0].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + servers[1].getAddressSettingsRepository().clear(); + servers[1].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[0].createQueue(QueueConfiguration.of("Expiry" + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("Expiry." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + waitForBindings(0, "queues." + getName(), 1, 0, true); + waitForBindings(1, "queues." + getName(), 1, 0, true); + + waitForBindings(0, "queues." + getName(), 1, 0, false); + waitForBindings(1, "queues." + getName(), 1, 0, false); + + // pausing the SNF queue to keep messages stuck on the queue + servers[0].getPostOffice().getAllBindings().filter(f -> f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue); + assertNotNull(snfPaused); + + long NUMBER_OF_MESSAGES = 100; + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61617"); + try (Connection connection = factory.createConnection()) { + Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session1.createProducer(session1.createQueue("queues." + getName())); + producer.setTimeToLive(2_000); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session1.createTextMessage("hello")); + } + session1.commit(); + } + Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100); + Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100); + Queue expiryQueue = servers[0].locateQueue("Expiry", "EXP.queues." + getName() + ".Expiry"); Review Comment: It would be a lot easier to follow this test if there were variables for the prefix, suffix, and 'main queue name' values that were then used to set up the configuration and composed for the other usages such as this (maybe via another variable). Especially given the slightly overloaded use of "Expiry" in many of them. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws Exception { * hence no information about delivering statistics should be updated. */ @Override public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { - if (addressSettings.getExpiryAddress() != null) { - createExpiryResources(); + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } Review Comment: Would including the queue name be helpful to be clear on which queue is logging this (and especially given the Jira, it seems likely to be different than may initially be expected)? ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws Exception { * hence no information about delivering statistics should be updated. */ @Override public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { - if (addressSettings.getExpiryAddress() != null) { - createExpiryResources(); + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + if (settingsToUse.getExpiryAddress() != null) { + createExpiryResources(ref.getMessage().getAddress(), settingsToUse); if (logger.isTraceEnabled()) { logger.trace("moving expired reference {} to address = {} from queue={}", ref, addressSettings.getExpiryAddress(), name); } - move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering); + move(null, settingsToUse.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering); Review Comment: This potentially changed the expiry address being used, but the trace log above it didnt change and is still using the other settings, so it is potentially logging something different than what is being done here. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws Exception { * hence no information about delivering statistics should be updated. */ @Override public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { - if (addressSettings.getExpiryAddress() != null) { - createExpiryResources(); + if (logger.isDebugEnabled()) { + logger.debug("Expiry on {}, expiryAddress={}", this.address, addressSettings.getExpiryAddress()); + } + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); Review Comment: This debug logged about expiry on one expiry address value above, but then immediately potentially used a different expiry address here, one which it doesn't log. Seems like it might be good to log the actual expiry address used if it just changed? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class ExpireQueueSuffixTest extends ActiveMQTestBase { + + public final SimpleString queueA = SimpleString.of("queueA"); + public final SimpleString queueB = SimpleString.of("queueB"); + public final SimpleString expiryAddress = SimpleString.of("myExpiry"); + + public final SimpleString expirySuffix = SimpleString.of(".expSuffix"); + public final long EXPIRY_DELAY = 10L; + + private ActiveMQServer server; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L); + + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix)); + + server.start(); + + server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST)); + } + + @Test + public void testAutoCreationOfExpiryResources() throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + long sendA = 7; + long sendB = 11; + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(queueA.toString())); + producer.setTimeToLive(100); + + for (int i = 0; i < sendA; i++) { + producer.send(session.createTextMessage("queueA")); + } + session.commit(); + + producer = session.createProducer(session.createQueue(queueB.toString())); + producer.setTimeToLive(100); + for (int i = 0; i < sendB; i++) { + producer.send(session.createTextMessage("queueB")); + } + session.commit(); + } + + Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." + queueA + expirySuffix) != null, 5000); Review Comment: Hehe, yes, on my earlier comment about using variables for clarity...something a lot like this. Could have saved myself some typing by reading ahead and referencing this hehe. (A variable could perhaps be added for this whole name to make it easier to follow) ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + servers[0].getAddressSettingsRepository().clear(); + servers[0].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + servers[1].getAddressSettingsRepository().clear(); + servers[1].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry"))); + servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("queues." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[0].createQueue(QueueConfiguration.of("Expiry" + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of("Expiry." + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); Review Comment: Given the earlier address settings look the same for both servers, is it expected these 2 queue creations should be subtly different? If so maybe a comment why? Why are they created outside either of the settings prefixes that were just configured? The sends seem to go to different queues, and the asserts are done on different queues. What are these created and/or used for? Was there meant to be an assert that these didnt end up with any messages? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact