This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 11b7671960 ARTEMIS-4684 Internal Queues should not redistribute
11b7671960 is described below
commit 11b7671960e53cefcf1db7523660108d59ceaeb7
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 12 19:05:00 2024 -0400
ARTEMIS-4684 Internal Queues should not redistribute
This is particularly true for the Mirrored SNF queue. Redistribution is not
meant for internal queues. If an internal queue happens to have the same name
on another server, it should not trigger redistribution when consumers are
removed.
It would be possible to work around this by adding an address-setting
specific to the address with redistribution disabled.
ClusteredMirrorSoakTest was intermittently failing because of this. For a
few seconds while the mirror connection is still being made connections could
move messages from one node towards another node if both have the same name.
---
.../connect/mirror/AMQPMirrorControllerSource.java | 4 ++++
.../artemis/core/server/impl/QueueImpl.java | 9 +++++++++
.../mirror/ClusteredMirrorSoakTest.java | 23 ++++++++++++++++------
.../tests/unit/core/server/impl/QueueImplTest.java | 9 +++++++++
4 files changed, 39 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 2891f1f07d..48f1ef4343 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -170,6 +170,10 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
assert snfQueue != null;
this.replicaConfig = replicaConfig;
this.snfQueue = snfQueue;
+ if (!snfQueue.isInternalQueue()) {
+ logger.debug("marking queue {} as internal to avoid redistribution
kicking in", snfQueue.getName());
+ snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
+ }
this.server = server;
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.addQueues = replicaConfig.isQueueCreation();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8dd7582917..b7efc1c721 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1594,8 +1594,17 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return supports;
}
+ public synchronized Redistributor getRedistributor() {
+ return redistributor == null ? null : redistributor.consumer;
+ }
+
@Override
public synchronized void addRedistributor(final long delay) {
+ if (isInternalQueue()) {
+ logger.debug("Queue {} is internal, can't be redistributed!",
this.name);
+ return;
+ }
+
clearRedistributorFuture();
if (redistributor != null) {
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index 0964050f0c..a2f765036c 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -68,10 +69,10 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
largeBody = writer.toString();
}
- public static final String DC1_NODE_A = "mirror/DC1/A";
- public static final String DC2_NODE_A = "mirror/DC2/A";
- public static final String DC1_NODE_B = "mirror/DC1/B";
- public static final String DC2_NODE_B = "mirror/DC2/B";
+ public static final String DC1_NODE_A = "ClusteredMirrorSoakTest/DC1/A";
+ public static final String DC2_NODE_A = "ClusteredMirrorSoakTest/DC2/A";
+ public static final String DC1_NODE_B = "ClusteredMirrorSoakTest/DC1/B";
+ public static final String DC2_NODE_B = "ClusteredMirrorSoakTest/DC2/B";
Process processDC1_node_A;
Process processDC1_node_B;
@@ -115,6 +116,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
ClusteredMirrorSoakTest.java --> \n"));
}
+
+ @Before
+ public void cleanupServers() {
+ cleanupData(DC1_NODE_A);
+ cleanupData(DC2_NODE_A);
+ cleanupData(DC2_NODE_B);
+ cleanupData(DC2_NODE_B);
+ }
+
+
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
@@ -286,7 +297,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
startServers();
- String queueName = "queue" + RandomUtil.randomString();
+ String queueName = "testqueue" + RandomUtil.randomString();
final int numberOfMessages = 50;
@@ -307,7 +318,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
- Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
+ Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 4821e618c5..f9c2bc74ac 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1084,6 +1084,15 @@ public class QueueImplTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testNoRedistributorInternalQueue() throws Exception {
+ QueueImpl queue = getTemporaryQueue();
+ queue.setInternalQueue(true);
+
+ queue.addRedistributor(0);
+ Assert.assertNull(queue.getRedistributor());
+ }
+
private void testConsumerWithFilters(final boolean direct) throws Exception
{
QueueImpl queue = getTemporaryQueue();