This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d864780293 ARTEMIS-4684 Internal queues should not redistribute
d864780293 is described below
commit d86478029378ba7fbf5b5005f614b2323999bf6e
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 13 15:01:20 2024 -0400
ARTEMIS-4684 Internal queues should not redistribute
---
.../artemis/core/server/impl/QueueImpl.java | 5 ++
.../mirror/ClusteredMirrorSoakTest.java | 83 +++++++++++++++-------
2 files changed, 63 insertions(+), 25 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index b7efc1c721..b7bb6676c1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1605,6 +1605,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return;
}
+ if
(address.startsWith(server.getConfiguration().getManagementAddress())) {
+ logger.debug("Queue {} is a management address, ignoring it for
redistribution", address);
+ return;
+ }
+
clearRedistributorFuture();
if (redistributor != null) {
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index a2f765036c..d4ae867009 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -29,6 +29,7 @@ import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -120,9 +121,9 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
+ cleanupData(DC1_NODE_B);
cleanupData(DC2_NODE_A);
cleanupData(DC2_NODE_B);
- cleanupData(DC2_NODE_B);
}
@@ -227,6 +228,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
private CountDownLatch startConsumer(Executor executor, ConnectionFactory
factory, String queue, AtomicBoolean running, AtomicInteger errorCount,
AtomicInteger receivedCount) {
CountDownLatch done = new CountDownLatch(1);
+ HashSet<Integer> receivedMessages = new HashSet<>();
+
executor.execute(() -> {
try {
try (Connection connection = factory.createConnection()) {
@@ -237,6 +240,11 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Message message = consumer.receive(100);
if (message != null) {
receivedCount.incrementAndGet();
+ Integer receivedI = message.getIntProperty("i");
+ if (!receivedMessages.add(receivedI)) {
+ errorCount.incrementAndGet();
+ logger.warn("Message {}, isLarge={} received in
duplicate", receivedI, message.getBooleanProperty("large"));
+ }
}
}
}
@@ -302,6 +310,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
final int numberOfMessages = 50;
ConnectionFactory connectionFactoryDC1A =
CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
+ ConnectionFactory connectionFactoryDC2A =
CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI);
ConnectionFactory connectionFactoryDC2B =
CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI);
AtomicBoolean runningConsumers = new AtomicBoolean(true);
@@ -309,30 +318,54 @@ public class ClusteredMirrorSoakTest extends SoakTestBase
{
AtomicInteger errors = new AtomicInteger(0);
AtomicInteger receiverCount = new AtomicInteger(0);
- SimpleManagement simpleManagementDC1A = new
SimpleManagement(DC1_NODEA_URI, null, null);
- SimpleManagement simpleManagementDC1B = new
SimpleManagement(DC1_NODEB_URI, null, null);
- SimpleManagement simpleManagementDC2A = new
SimpleManagement(DC2_NODEA_URI, null, null);
- SimpleManagement simpleManagementDC2B = new
SimpleManagement(DC2_NODEB_URI, null, null);
-
- CountDownLatch doneDC2B = startConsumer(executorService,
connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
-
- sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
-
- Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
-
- Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
- Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
- Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
- Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
-
- Wait.assertEquals(0, () ->
simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
- Wait.assertEquals(0, () ->
simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
- Wait.assertEquals(0, () ->
simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
- Wait.assertEquals(0, () ->
simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
-
- runningConsumers.set(false);
-
- Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
+ try (SimpleManagement simpleManagementDC1A = new
SimpleManagement(DC1_NODEA_URI, null, null);
+ SimpleManagement simpleManagementDC1B = new
SimpleManagement(DC1_NODEB_URI, null, null);
+ SimpleManagement simpleManagementDC2A = new
SimpleManagement(DC2_NODEA_URI, null, null);
+ SimpleManagement simpleManagementDC2B = new
SimpleManagement(DC2_NODEB_URI, null, null)) {
+
+ Assert.assertFalse(findQueue(simpleManagementDC1A, queueName));
+ Assert.assertFalse(findQueue(simpleManagementDC1B, queueName));
+ Assert.assertFalse(findQueue(simpleManagementDC2A, queueName));
+ Assert.assertFalse(findQueue(simpleManagementDC2B, queueName));
+
+ // just to allow auto-creation to kick in....
+ Assert.assertTrue(startConsumer(executorService,
connectionFactoryDC2A, queueName, new AtomicBoolean(false), errors,
receiverCount).await(1, TimeUnit.MINUTES));
+ Assert.assertTrue(startConsumer(executorService,
connectionFactoryDC2B, queueName, new AtomicBoolean(false), errors,
receiverCount).await(1, TimeUnit.MINUTES));
+
+ Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
+
+ sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
+
+ Wait.assertEquals(numberOfMessages, () ->
simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000);
+ Wait.assertEquals(numberOfMessages, () ->
simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000);
+
+ CountDownLatch doneDC2B = startConsumer(executorService,
connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
+ Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
+
+ Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
+
+ Wait.assertEquals(0, () ->
simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000);
+
+ runningConsumers.set(false);
+
+ Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+ }
}
@Test