[
https://issues.apache.org/jira/browse/ARTEMIS-5119?focusedWorklogId=941351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-941351
]
ASF GitHub Bot logged work on ARTEMIS-5119:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Nov/24 11:17
Start Date: 01/Nov/24 11:17
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #5327:
URL: https://github.com/apache/activemq-artemis/pull/5327#discussion_r1825666789
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final
ServerConsumer consumer, bo
refCountForConsumers.check();
if (server != null && server.hasBrokerMessagePlugins()) {
- server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref,
addressSettings.getExpiryAddress(), consumer));
+ server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref,
settingsToUse.getExpiryAddress(), consumer));
+ }
+ }
+
+
+ AddressSettings getMessageAddressSettings(Message message) {
+ if (message.getAddress().equals(String.valueOf(address))) {
+ return addressSettings;
+ } else {
+ return
server.getAddressSettingsRepository().getMatch(message.getAddress());
}
}
+ private void expire(final Transaction tx, final MessageReference ref,
boolean delivering) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Expiry on {}, expiryAddress={}", this.address,
addressSettings.getExpiryAddress());
+ }
+
+ AddressSettings settingsToUse =
getMessageAddressSettings(ref.getMessage());
+ SimpleString expiryAddress = settingsToUse.getExpiryAddress();
Review Comment:
Similar comment to other expire method about logging one thing then
immediately doing another.
Would the other method be clearer if it also created an _expiryAddress_
variable like this method?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Queue snfPaused;
+
+ @Test
+ public void testExpiryOnSNF() throws Exception {
+ setupServer(0, true, true);
+ setupServer(1, true, true);
+
+ setupClusterConnection("cluster0", "queues",
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+ setupClusterConnection("cluster1", "queues",
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+ servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+ startServers(0, 1);
+
+ servers[0].getAddressSettingsRepository().clear();
+ servers[0].getAddressSettingsRepository().addMatch("queues#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+ servers[0].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ servers[1].getAddressSettingsRepository().clear();
+ servers[1].getAddressSettingsRepository().addMatch("queues#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+ servers[1].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ Queue serverQueue0 =
servers[0].createQueue(QueueConfiguration.of("queues." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[1].createQueue(QueueConfiguration.of("queues." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[0].createQueue(QueueConfiguration.of("Expiry" +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[1].createQueue(QueueConfiguration.of("Expiry." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+ waitForBindings(0, "queues." + getName(), 1, 0, true);
+ waitForBindings(1, "queues." + getName(), 1, 0, true);
+
+ waitForBindings(0, "queues." + getName(), 1, 0, false);
+ waitForBindings(1, "queues." + getName(), 1, 0, false);
+
+ // pausing the SNF queue to keep messages stuck on the queue
+ servers[0].getPostOffice().getAllBindings().filter(f ->
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+ assertNotNull(snfPaused);
+
+ long NUMBER_OF_MESSAGES = 100;
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61617");
Review Comment:
Is factory2 unused as it looks?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2147,10 +2151,71 @@ public void expire(final MessageReference ref, final
ServerConsumer consumer, bo
refCountForConsumers.check();
if (server != null && server.hasBrokerMessagePlugins()) {
- server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref,
addressSettings.getExpiryAddress(), consumer));
+ server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref,
settingsToUse.getExpiryAddress(), consumer));
+ }
+ }
+
+
+ AddressSettings getMessageAddressSettings(Message message) {
+ if (message.getAddress().equals(String.valueOf(address))) {
+ return addressSettings;
+ } else {
+ return
server.getAddressSettingsRepository().getMatch(message.getAddress());
}
}
+ private void expire(final Transaction tx, final MessageReference ref,
boolean delivering) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Expiry on {}, expiryAddress={}", this.address,
addressSettings.getExpiryAddress());
+ }
+
+ AddressSettings settingsToUse =
getMessageAddressSettings(ref.getMessage());
+ SimpleString expiryAddress = settingsToUse.getExpiryAddress();
+
+ if (expiryAddress != null && expiryAddress.length() != 0) {
Review Comment:
Seems strange to length-check it here but not in the check above in the
other expire method?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Queue snfPaused;
+
+ @Test
+ public void testExpiryOnSNF() throws Exception {
+ setupServer(0, true, true);
+ setupServer(1, true, true);
+
+ setupClusterConnection("cluster0", "queues",
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+ setupClusterConnection("cluster1", "queues",
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+ servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+ startServers(0, 1);
+
+ servers[0].getAddressSettingsRepository().clear();
+ servers[0].getAddressSettingsRepository().addMatch("queues#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+ servers[0].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ servers[1].getAddressSettingsRepository().clear();
+ servers[1].getAddressSettingsRepository().addMatch("queues#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+ servers[1].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ Queue serverQueue0 =
servers[0].createQueue(QueueConfiguration.of("queues." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[1].createQueue(QueueConfiguration.of("queues." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[0].createQueue(QueueConfiguration.of("Expiry" +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[1].createQueue(QueueConfiguration.of("Expiry." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+ waitForBindings(0, "queues." + getName(), 1, 0, true);
+ waitForBindings(1, "queues." + getName(), 1, 0, true);
+
+ waitForBindings(0, "queues." + getName(), 1, 0, false);
+ waitForBindings(1, "queues." + getName(), 1, 0, false);
+
+ // pausing the SNF queue to keep messages stuck on the queue
+ servers[0].getPostOffice().getAllBindings().filter(f ->
f.getUniqueName().toString().startsWith("$.artemis.internal.sf")).forEach(this::pauseQueue);
+ assertNotNull(snfPaused);
+
+ long NUMBER_OF_MESSAGES = 100;
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ ConnectionFactory factory2 = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61617");
+ try (Connection connection = factory.createConnection()) {
+ Session session1 = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session1.createProducer(session1.createQueue("queues." + getName()));
+ producer.setTimeToLive(2_000);
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session1.createTextMessage("hello"));
+ }
+ session1.commit();
+ }
+ Wait.assertEquals(0L, serverQueue0::getMessageCount, 50_000, 100);
+ Wait.assertEquals(0L, snfPaused::getMessageCount, 50_000, 100);
+ Queue expiryQueue = servers[0].locateQueue("Expiry", "EXP.queues." +
getName() + ".Expiry");
Review Comment:
It would be a lot easier to follow this test if there were variables for the
prefix, suffix, and 'main queue name' values that were then used to set up the
configuration and composed for the other usages such as this (maybe via another
variable). Especially given the slightly overloaded use of "Expiry" in many of
them.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws
Exception {
* hence no information about delivering statistics should be
updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer
consumer, boolean delivering) throws Exception {
- if (addressSettings.getExpiryAddress() != null) {
- createExpiryResources();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Expiry on {}, expiryAddress={}", this.address,
addressSettings.getExpiryAddress());
+ }
Review Comment:
Would including the queue name be helpful to be clear on which queue is
logging this (and especially given the Jira, it seems likely to be different
than may initially be expected)?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws
Exception {
* hence no information about delivering statistics should be
updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer
consumer, boolean delivering) throws Exception {
- if (addressSettings.getExpiryAddress() != null) {
- createExpiryResources();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Expiry on {}, expiryAddress={}", this.address,
addressSettings.getExpiryAddress());
+ }
+ AddressSettings settingsToUse =
getMessageAddressSettings(ref.getMessage());
+ if (settingsToUse.getExpiryAddress() != null) {
+ createExpiryResources(ref.getMessage().getAddress(), settingsToUse);
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference {} to address = {} from
queue={}", ref, addressSettings.getExpiryAddress(), name);
}
- move(null, addressSettings.getExpiryAddress(), null, ref, false,
AckReason.EXPIRED, consumer, null, delivering);
+ move(null, settingsToUse.getExpiryAddress(), null, ref, false,
AckReason.EXPIRED, consumer, null, delivering);
Review Comment:
This potentially changed the expiry address being used, but the trace log
above it didnt change and is still using the other settings, so it is
potentially logging something different than what is being done here.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -2129,14 +2129,18 @@ public void expire(final MessageReference ref) throws
Exception {
* hence no information about delivering statistics should be
updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer
consumer, boolean delivering) throws Exception {
- if (addressSettings.getExpiryAddress() != null) {
- createExpiryResources();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Expiry on {}, expiryAddress={}", this.address,
addressSettings.getExpiryAddress());
+ }
+ AddressSettings settingsToUse =
getMessageAddressSettings(ref.getMessage());
Review Comment:
This debug logged about expiry on one expiry address value above, but then
immediately potentially used a different expiry address here, one which it
doesn't log. Seems like it might be good to log the actual expiry address used
if it just changed?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpireQueueSuffixTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ExpireQueueSuffixTest extends ActiveMQTestBase {
+
+ public final SimpleString queueA = SimpleString.of("queueA");
+ public final SimpleString queueB = SimpleString.of("queueB");
+ public final SimpleString expiryAddress = SimpleString.of("myExpiry");
+
+ public final SimpleString expirySuffix = SimpleString.of(".expSuffix");
+ public final long EXPIRY_DELAY = 10L;
+
+ private ActiveMQServer server;
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+
server.getConfiguration().setAddressQueueScanPeriod(50L).setMessageExpiryScanPeriod(50L);
+
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setAutoCreateExpiryResources(true).setExpiryAddress(expiryAddress).setExpiryDelay(EXPIRY_DELAY).setExpiryQueueSuffix(expirySuffix));
+
+ server.start();
+
+
server.createQueue(QueueConfiguration.of(queueA).setRoutingType(RoutingType.ANYCAST));
+
server.createQueue(QueueConfiguration.of(queueB).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ @Test
+ public void testAutoCreationOfExpiryResources() throws Exception {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ long sendA = 7;
+ long sendB = 11;
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueA.toString()));
+ producer.setTimeToLive(100);
+
+ for (int i = 0; i < sendA; i++) {
+ producer.send(session.createTextMessage("queueA"));
+ }
+ session.commit();
+
+ producer =
session.createProducer(session.createQueue(queueB.toString()));
+ producer.setTimeToLive(100);
+ for (int i = 0; i < sendB; i++) {
+ producer.send(session.createTextMessage("queueB"));
+ }
+ session.commit();
+ }
+
+ Wait.waitFor(() -> server.locateQueue(expiryAddress.toString(), "EXP." +
queueA + expirySuffix) != null, 5000);
Review Comment:
Hehe, yes, on my earlier comment about using variables for
clarity...something a lot like this. Could have saved myself some typing by
reading ahead and referencing this hehe.
(A variable could perhaps be added for this whole name to make it easier to
follow)
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/expiry/ClusteredExpiryTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.expiry;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ClusteredExpiryTest extends ClusterTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Queue snfPaused;
+
+ @Test
+ public void testExpiryOnSNF() throws Exception {
+ setupServer(0, true, true);
+ setupServer(1, true, true);
+
+ setupClusterConnection("cluster0", "queues",
MessageLoadBalancingType.STRICT, 1, true, 0, 1);
+
+ setupClusterConnection("cluster1", "queues",
MessageLoadBalancingType.STRICT, 1, true, 1, 0);
+
+ servers[0].getConfiguration().setMessageExpiryScanPeriod(10);
+
+ startServers(0, 1);
+
+ servers[0].getAddressSettingsRepository().clear();
+ servers[0].getAddressSettingsRepository().addMatch("queues#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+ servers[0].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ servers[1].getAddressSettingsRepository().clear();
+ servers[1].getAddressSettingsRepository().addMatch("queues#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".Expiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("Expiry")));
+ servers[1].getAddressSettingsRepository().addMatch("$#", new
AddressSettings().setExpiryQueueSuffix(SimpleString.of(".SNFExpiry")).setAutoCreateExpiryResources(true).setExpiryAddress(SimpleString.of("SNFExpiry")));
+
+ Queue serverQueue0 =
servers[0].createQueue(QueueConfiguration.of("queues." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[1].createQueue(QueueConfiguration.of("queues." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[0].createQueue(QueueConfiguration.of("Expiry" +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+ servers[1].createQueue(QueueConfiguration.of("Expiry." +
getName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Review Comment:
Given the earlier address settings look the same for both servers, is it
expected these 2 queue creations should be subtly different? If so maybe a
comment why?
Why are they created outside either of the settings prefixes that were just
configured? The sends seem to go to different queues, and the asserts are done
on different queues. What are these created and/or used for? Was there meant to
be an assert that these didnt end up with any messages?
Issue Time Tracking
-------------------
Worklog Id: (was: 941351)
Time Spent: 20m (was: 10m)
> Messages that expire in store-and-forward (sf-) queues should go to original
> queue's configured expiry queue
> ------------------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-5119
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5119
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.37.0
> Reporter: Rakhi Kumari
> Assignee: Clebert Suconic
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.39.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> When messages expire in the store-and-forward queues, they go to the
> store-and-forward queue's expiry queue. It would be better if the messages go
> to the original queue's expiry queue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact