This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d864780293 ARTEMIS-4684 Internal queues should not redistribute
d864780293 is described below

commit d86478029378ba7fbf5b5005f614b2323999bf6e
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 13 15:01:20 2024 -0400

    ARTEMIS-4684 Internal queues should not redistribute
---
 .../artemis/core/server/impl/QueueImpl.java        |  5 ++
 .../mirror/ClusteredMirrorSoakTest.java            | 83 +++++++++++++++-------
 2 files changed, 63 insertions(+), 25 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index b7efc1c721..b7bb6676c1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1605,6 +1605,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          return;
       }
 
+      if 
(address.startsWith(server.getConfiguration().getManagementAddress())) {
+         logger.debug("Queue {} is a management address, ignoring it for 
redistribution", address);
+         return;
+      }
+
       clearRedistributorFuture();
 
       if (redistributor != null) {
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index a2f765036c..d4ae867009 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -29,6 +29,7 @@ import javax.jms.Topic;
 import java.io.File;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -120,9 +121,9 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
    @Before
    public void cleanupServers() {
       cleanupData(DC1_NODE_A);
+      cleanupData(DC1_NODE_B);
       cleanupData(DC2_NODE_A);
       cleanupData(DC2_NODE_B);
-      cleanupData(DC2_NODE_B);
    }
 
 
@@ -227,6 +228,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
    private CountDownLatch startConsumer(Executor executor, ConnectionFactory 
factory, String queue, AtomicBoolean running, AtomicInteger errorCount, 
AtomicInteger receivedCount) {
       CountDownLatch done = new CountDownLatch(1);
 
+      HashSet<Integer> receivedMessages = new HashSet<>();
+
       executor.execute(() -> {
          try {
             try (Connection connection = factory.createConnection()) {
@@ -237,6 +240,11 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
                   Message message = consumer.receive(100);
                   if (message != null) {
                      receivedCount.incrementAndGet();
+                     Integer receivedI = message.getIntProperty("i");
+                     if (!receivedMessages.add(receivedI)) {
+                        errorCount.incrementAndGet();
+                        logger.warn("Message {}, isLarge={} received in 
duplicate", receivedI, message.getBooleanProperty("large"));
+                     }
                   }
                }
             }
@@ -302,6 +310,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
       final int numberOfMessages = 50;
 
       ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
+      ConnectionFactory connectionFactoryDC2A = 
CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI);
       ConnectionFactory connectionFactoryDC2B = 
CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI);
 
       AtomicBoolean runningConsumers = new AtomicBoolean(true);
@@ -309,30 +318,54 @@ public class ClusteredMirrorSoakTest extends SoakTestBase 
{
       AtomicInteger errors = new AtomicInteger(0);
       AtomicInteger receiverCount = new AtomicInteger(0);
 
-      SimpleManagement simpleManagementDC1A = new 
SimpleManagement(DC1_NODEA_URI, null, null);
-      SimpleManagement simpleManagementDC1B = new 
SimpleManagement(DC1_NODEB_URI, null, null);
-      SimpleManagement simpleManagementDC2A = new 
SimpleManagement(DC2_NODEA_URI, null, null);
-      SimpleManagement simpleManagementDC2B = new 
SimpleManagement(DC2_NODEB_URI, null, null);
-
-      CountDownLatch doneDC2B = startConsumer(executorService, 
connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
-
-      sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
-
-      Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
-
-      Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
-      Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
-      Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
-      Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
-
-      Wait.assertEquals(0, () -> 
simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
-      Wait.assertEquals(0, () -> 
simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
-      Wait.assertEquals(0, () -> 
simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
-      Wait.assertEquals(0, () -> 
simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
-
-      runningConsumers.set(false);
-
-      Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
+      try (SimpleManagement simpleManagementDC1A = new 
SimpleManagement(DC1_NODEA_URI, null, null);
+           SimpleManagement simpleManagementDC1B = new 
SimpleManagement(DC1_NODEB_URI, null, null);
+           SimpleManagement simpleManagementDC2A = new 
SimpleManagement(DC2_NODEA_URI, null, null);
+           SimpleManagement simpleManagementDC2B = new 
SimpleManagement(DC2_NODEB_URI, null, null)) {
+
+         Assert.assertFalse(findQueue(simpleManagementDC1A, queueName));
+         Assert.assertFalse(findQueue(simpleManagementDC1B, queueName));
+         Assert.assertFalse(findQueue(simpleManagementDC2A, queueName));
+         Assert.assertFalse(findQueue(simpleManagementDC2B, queueName));
+
+         // just to allow auto-creation to kick in....
+         Assert.assertTrue(startConsumer(executorService, 
connectionFactoryDC2A, queueName, new AtomicBoolean(false), errors, 
receiverCount).await(1, TimeUnit.MINUTES));
+         Assert.assertTrue(startConsumer(executorService, 
connectionFactoryDC2B, queueName, new AtomicBoolean(false), errors, 
receiverCount).await(1, TimeUnit.MINUTES));
+
+         Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
+         Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
+         Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
+         Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
+
+         sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
+
+         Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000);
+         Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000);
+
+         CountDownLatch doneDC2B = startConsumer(executorService, 
connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
+         Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
+
+         Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
+         Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
+         Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
+         Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
+
+         Wait.assertEquals(0, () -> 
simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC1B.getMessageCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
+         Wait.assertEquals(0, () -> 
simpleManagementDC2B.getMessageCountOnQueue(queueName), 5000);
+
+         runningConsumers.set(false);
+
+         Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
+         Assert.assertEquals(0, errors.get());
+      }
    }
 
    @Test

Reply via email to