gemmellr commented on code in PR #5327:
URL: https://github.com/apache/activemq-artemis/pull/5327#discussion_r1825975145


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2147,18 +2160,77 @@ public void expire(final MessageReference ref, final 
ServerConsumer consumer, bo
       refCountForConsumers.check();
 
       if (server != null && server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
addressSettings.getExpiryAddress(), consumer));
+         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
settingsToUse.getExpiryAddress(), consumer));
+      }
+   }
+
+   private void expireFromScan(final Transaction tx, final MessageReference 
ref, boolean delivering) throws Exception {
+

Review Comment:
   Superfluous newline



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2130,26 @@ public void expire(final MessageReference ref) throws 
Exception {
     *           hence no information about delivering statistics should be 
updated. */
    @Override
    public void expire(final MessageReference ref, final ServerConsumer 
consumer, boolean delivering) throws Exception {
-      if (addressSettings.getExpiryAddress() != null) {
-         createExpiryResources();
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());

Review Comment:
   The expireFromScan method goes on to do "SimpleString expiryAddress = 
settingsToUse.getExpiryAddress();" and then use that variable in [most of] the 
places the value is used. Its strange for two very similar related methods 
right next to each other to be differing in that. Would be more readable later 
to either both use the variable, or neither use the variable.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Queue snfPaused;
+
+   @Test
+   public void testExpiryOnSNF() throws Exception {
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+      servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+      startServers(0, 1);
+
+      String queuesPrefix = "queues.";
+      final String expirySuffix = ".Expiry";
+      final String expiryPrefix = "myEXP.";
+      final String expiryAddress = "ExpiryAddress";
+      final String resultingExpiryQueue = expiryPrefix + queuesPrefix + 
getName() + expirySuffix;
+
+      servers[0].getAddressSettingsRepository().clear();
+      servers[0].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress)));
+      servers[0].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      servers[1].getAddressSettingsRepository().clear();
+      servers[1].getAddressSettingsRepository().addMatch("queues#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(expirySuffix)).setExpiryQueuePrefix(SimpleString.of(expiryPrefix)).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of(expiryAddress)));
+      servers[1].getAddressSettingsRepository().addMatch("$#", new 
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+      Queue serverQueue0 = 
servers[0].createQueue(QueueConfiguration.of(queuesPrefix + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+      servers[1].createQueue(QueueConfiguration.of(queuesPrefix + 
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      waitForBindings(0, queuesPrefix + getName(), 1, 0, true);
+      waitForBindings(1, queuesPrefix + getName(), 1, 0, true);
+
+      waitForBindings(0, queuesPrefix + getName(), 1, 0, false);
+      waitForBindings(1, queuesPrefix + getName(), 1, 0, false);
+
+      // pausing the SNF queue to keep messages stuck on the queue
+      servers[0].getPostOffice().getAllBindings().filter(f -> 
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+      assertNotNull(snfPaused);
+
+      long NUMBER_OF_MESSAGES = 100;
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session1 = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session1.createProducer(session1.createQueue(queuesPrefix + getName()));
+         producer.setTimeToLive(500);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session1.createTextMessage("hello"));
+         }
+         session1.commit();
+      }
+      Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100);
+      Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100);

Review Comment:
   Do we need 50 seconds timeout, with the expiry reaper set at 10ms ?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -274,7 +274,8 @@ private void 
checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory
 
    private final StorageManager storageManager;
 
-   private volatile AddressSettings addressSettings;
+   // instead of looking the address Settings everytime, we cache and monitore 
it through onChange

Review Comment:
   ```suggestion
      // Instead of looking up the AddressSettings every time, we cache and 
monitor it through onChange
   ```



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2147,18 +2160,77 @@ public void expire(final MessageReference ref, final 
ServerConsumer consumer, bo
       refCountForConsumers.check();
 
       if (server != null && server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
addressSettings.getExpiryAddress(), consumer));
+         server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, 
settingsToUse.getExpiryAddress(), consumer));
+      }
+   }
+
+   private void expireFromScan(final Transaction tx, final MessageReference 
ref, boolean delivering) throws Exception {
+
+      AddressSettings settingsToUse = 
getMessageAddressSettings(ref.getMessage());
+      SimpleString expiryAddress = settingsToUse.getExpiryAddress();
+
+      if (logger.isDebugEnabled()) {
+         logger.debug("expireFromScan on {}/{}, expiryAddress={}", 
this.address, this.name, settingsToUse.getExpiryAddress());

Review Comment:
   ```suggestion
            logger.debug("expireFromScan on {}/{}, expiryAddress={}", 
this.address, this.name, expiryAddress);
   ```



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Queue snfPaused;
+
+   @Test
+   public void testExpiryOnSNF() throws Exception {
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+      servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+      startServers(0, 1);
+
+      String queuesPrefix = "queues.";
+      final String expirySuffix = ".Expiry";
+      final String expiryPrefix = "myEXP.";
+      final String expiryAddress = "ExpiryAddress";
+      final String resultingExpiryQueue = expiryPrefix + queuesPrefix + 
getName() + expirySuffix;

Review Comment:
   Strange for one of 5 to be non-final when it doesnt change.
   
   Would be nice if the address settings configuration used _queuesPrefix_ so 
it was obvious that was what the prefix is actually there for.
   
   ```suggestion
         final String queuesPrefix = "queues.";
         final String queueName = queuesPrefix + getName();
         final String expirySuffix = ".Expiry";
         final String expiryPrefix = "myEXP.";
         final String expiryAddress = "ExpiryAddress";
         final String resultingExpiryQueue = expiryPrefix + queueName + 
expirySuffix;
   ```
   
   (Plus all the later changes to use simply _queueName_ instead of 
_queuesPrefix + getName()__



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to