gemmellr commented on code in PR #5327: URL: https://github.com/apache/activemq-artemis/pull/5327#discussion_r1825975145
########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2147,18 +2160,77 @@ public void expire(final MessageReference ref, final ServerConsumer consumer, bo refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer)); + } + } + + private void expireFromScan(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { + Review Comment: Superfluous newline ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2129,14 +2130,26 @@ public void expire(final MessageReference ref) throws Exception { * hence no information about delivering statistics should be updated. */ @Override public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception { - if (addressSettings.getExpiryAddress() != null) { - createExpiryResources(); + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); Review Comment: The expireFromScan method goes on to do "SimpleString expiryAddress = settingsToUse.getExpiryAddress();" and then use that variable in [most of] the places the value is used. Its strange for two very similar related methods right next to each other to be differing in that. Would be more readable later to either both use the variable, or neither use the variable. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + String queuesPrefix = "queues."; + final String expirySuffix = ".Expiry"; + final String expiryPrefix = "myEXP."; + final String expiryAddress = "ExpiryAddress"; + final String resultingExpiryQueue = expiryPrefix + queuesPrefix + getName() + expirySuffix; + + servers[0].getAddressSettingsRepository().clear(); + servers[0].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress))); + servers[0].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + servers[1].getAddressSettingsRepository().clear(); + servers[1].getAddressSettingsRepository().addMatch("queues#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress))); + servers[1].getAddressSettingsRepository().addMatch("$#", new AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry"))); + + Queue serverQueue0 = servers[0].createQueue(QueueConfiguration.of(queuesPrefix + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + servers[1].createQueue(QueueConfiguration.of(queuesPrefix + getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + waitForBindings(0, queuesPrefix + getName(), 1, 0, true); + waitForBindings(1, queuesPrefix + getName(), 1, 0, true); + + waitForBindings(0, queuesPrefix + getName(), 1, 0, false); + waitForBindings(1, queuesPrefix + getName(), 1, 0, false); + + // pausing the SNF queue to keep messages stuck on the queue + servers[0].getPostOffice().getAllBindings().filter(f -> f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue); + assertNotNull(snfPaused); + + long NUMBER_OF_MESSAGES = 100; + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session1.createProducer(session1.createQueue(queuesPrefix + getName())); + producer.setTimeToLive(500); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session1.createTextMessage("hello")); + } + session1.commit(); + } + Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100); + Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100); Review Comment: Do we need 50 seconds timeout, with the expiry reaper set at 10ms ? ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -274,7 +274,8 @@ private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory private final StorageManager storageManager; - private volatile AddressSettings addressSettings; + // instead of looking the address Settings everytime, we cache and monitore it through onChange Review Comment: ```suggestion // Instead of looking up the AddressSettings every time, we cache and monitor it through onChange ``` ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -2147,18 +2160,77 @@ public void expire(final MessageReference ref, final ServerConsumer consumer, bo refCountForConsumers.check(); if (server != null && server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer)); + server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer)); + } + } + + private void expireFromScan(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception { + + AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage()); + SimpleString expiryAddress = settingsToUse.getExpiryAddress(); + + if (logger.isDebugEnabled()) { + logger.debug("expireFromScan on {}/{}, expiryAddress={}", this.address, this.name, settingsToUse.getExpiryAddress()); Review Comment: ```suggestion logger.debug("expireFromScan on {}/{}, expiryAddress={}", this.address, this.name, expiryAddress); ``` ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.expiry; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ClusteredExpiryTest extends ClusterTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + Queue snfPaused; + + @Test + public void testExpiryOnSNF() throws Exception { + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0); + + servers[0].getConfiguration().setMessageExpiryScanPeriod(10); + + startServers(0, 1); + + String queuesPrefix = "queues."; + final String expirySuffix = ".Expiry"; + final String expiryPrefix = "myEXP."; + final String expiryAddress = "ExpiryAddress"; + final String resultingExpiryQueue = expiryPrefix + queuesPrefix + getName() + expirySuffix; Review Comment: Strange for one of 5 to be non-final when it doesnt change. Would be nice if the address settings configuration used _queuesPrefix_ so it was obvious that was what the prefix is actually there for. ```suggestion final String queuesPrefix = "queues."; final String queueName = queuesPrefix + getName(); final String expirySuffix = ".Expiry"; final String expiryPrefix = "myEXP."; final String expiryAddress = "ExpiryAddress"; final String resultingExpiryQueue = expiryPrefix + queueName + expirySuffix; ``` (Plus all the later changes to use simply _queueName_ instead of _queuesPrefix + getName()__ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact