This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 11b7671960 ARTEMIS-4684 Internal Queues should not redistribute
11b7671960 is described below

commit 11b7671960e53cefcf1db7523660108d59ceaeb7
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 12 19:05:00 2024 -0400

    ARTEMIS-4684 Internal Queues should not redistribute
    
    This is particularly true for the Mirrored SNF queue. Redistribution is not 
meant for internal queues. If an internal queue happens to have the same name 
on another server, it should not trigger redistribution when consumers are 
removed.
    
    It would be possible to work around this by adding an address-setting 
specific to the address with redistribution disabled.
    
    ClusteredMirrorSoakTest was intermittently failing because of this. For a 
few seconds while the mirror connection is still being made connections could 
move messages from one node towards another node if both have the same name.
---
 .../connect/mirror/AMQPMirrorControllerSource.java |  4 ++++
 .../artemis/core/server/impl/QueueImpl.java        |  9 +++++++++
 .../mirror/ClusteredMirrorSoakTest.java            | 23 ++++++++++++++++------
 .../tests/unit/core/server/impl/QueueImplTest.java |  9 +++++++++
 4 files changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 2891f1f07d..48f1ef4343 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -170,6 +170,10 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       assert snfQueue != null;
       this.replicaConfig = replicaConfig;
       this.snfQueue = snfQueue;
+      if (!snfQueue.isInternalQueue()) {
+         logger.debug("marking queue {} as internal to avoid redistribution 
kicking in", snfQueue.getName());
+         snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
+      }
       this.server = server;
       this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
       this.addQueues = replicaConfig.isQueueCreation();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8dd7582917..b7efc1c721 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1594,8 +1594,17 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       return supports;
    }
 
+   public synchronized Redistributor getRedistributor() {
+      return redistributor == null ? null : redistributor.consumer;
+   }
+
    @Override
    public synchronized void addRedistributor(final long delay) {
+      if (isInternalQueue()) {
+         logger.debug("Queue {} is internal, can't be redistributed!", 
this.name);
+         return;
+      }
+
       clearRedistributorFuture();
 
       if (redistributor != null) {
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index 0964050f0c..a2f765036c 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.utils.FileUtil;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -68,10 +69,10 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
       largeBody = writer.toString();
    }
 
-   public static final String DC1_NODE_A = "mirror/DC1/A";
-   public static final String DC2_NODE_A = "mirror/DC2/A";
-   public static final String DC1_NODE_B = "mirror/DC1/B";
-   public static final String DC2_NODE_B = "mirror/DC2/B";
+   public static final String DC1_NODE_A = "ClusteredMirrorSoakTest/DC1/A";
+   public static final String DC2_NODE_A = "ClusteredMirrorSoakTest/DC2/A";
+   public static final String DC1_NODE_B = "ClusteredMirrorSoakTest/DC1/B";
+   public static final String DC2_NODE_B = "ClusteredMirrorSoakTest/DC2/B";
 
    Process processDC1_node_A;
    Process processDC1_node_B;
@@ -115,6 +116,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
       Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
ClusteredMirrorSoakTest.java --> \n"));
    }
 
+
+   @Before
+   public void cleanupServers() {
+      cleanupData(DC1_NODE_A);
+      cleanupData(DC2_NODE_A);
+      cleanupData(DC2_NODE_B);
+      cleanupData(DC2_NODE_B);
+   }
+
+
    @BeforeClass
    public static void createServers() throws Exception {
       createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
@@ -286,7 +297,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
 
       startServers();
 
-      String queueName = "queue" + RandomUtil.randomString();
+      String queueName = "testqueue" + RandomUtil.randomString();
 
       final int numberOfMessages = 50;
 
@@ -307,7 +318,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
 
       sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
 
-      Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
+      Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
 
       Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
       Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 4821e618c5..f9c2bc74ac 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1084,6 +1084,15 @@ public class QueueImplTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testNoRedistributorInternalQueue() throws Exception {
+      QueueImpl queue = getTemporaryQueue();
+      queue.setInternalQueue(true);
+
+      queue.addRedistributor(0);
+      Assert.assertNull(queue.getRedistributor());
+   }
+
    private void testConsumerWithFilters(final boolean direct) throws Exception 
{
       QueueImpl queue = getTemporaryQueue();
 

Reply via email to