This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fdf2ea874b ARTEMIS-4733 Infinite mirror reflections after CreateAddress
fdf2ea874b is described below

commit fdf2ea874bd903aba446d0bca377793028ed56b4
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Fri Apr 19 14:50:11 2024 -0400

    ARTEMIS-4733 Infinite mirror reflections after CreateAddress
---
 .../api/core/management/SimpleManagement.java      |   4 +
 .../core/postoffice/impl/PostOfficeImpl.java       |   8 +-
 .../mirror/ClusteredMirrorSoakTest.java            | 186 ++++++++++++++++++---
 3 files changed, 168 insertions(+), 30 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
index 1b46ab264b..ece36b40b7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
@@ -118,6 +118,10 @@ public class SimpleManagement implements AutoCloseable {
       return simpleManagementLong(ResourceNames.QUEUE + queueName, 
"getMessageCount");
    }
 
+   public long getMessageAddedOnQueue(String queueName) throws Exception {
+      return simpleManagementLong(ResourceNames.QUEUE + queueName, 
"getMessagesAdded");
+   }
+
    public int getDeliveringCountOnQueue(String queueName) throws Exception {
       return simpleManagementInt(ResourceNames.QUEUE + queueName, 
"getDeliveringCount");
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 752cb23cab..1e21bb2639 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -542,10 +542,6 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
             server.callBrokerAddressPlugins(plugin -> 
plugin.beforeAddAddress(addressInfo, reload));
          }
 
-         if (!reload && mirrorControllerSource != null) {
-            mirrorControllerSource.addAddress(addressInfo);
-         }
-
          boolean result;
          if (reload) {
             result = addressManager.reloadAddressInfo(addressInfo);
@@ -554,6 +550,10 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          }
          // only register address if it is new
          if (result) {
+            if (!reload && mirrorControllerSource != null) {
+               mirrorControllerSource.addAddress(addressInfo);
+            }
+
             try {
                managementService.registerAddress(addressInfo);
 
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index d4ae867009..95f54f5593 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -39,8 +39,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.management.SimpleManagement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -49,8 +53,6 @@ import org.apache.activemq.artemis.utils.FileUtil;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,18 +82,17 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
    Process processDC2_node_A;
    Process processDC2_node_B;
 
-
    private static String DC1_NODEA_URI = "tcp://localhost:61616";
    private static String DC1_NODEB_URI = "tcp://localhost:61617";
    private static String DC2_NODEA_URI = "tcp://localhost:61618";
    private static String DC2_NODEB_URI = "tcp://localhost:61619";
 
-   private static void createServer(String serverName, String connectionName, 
String clusterURI, String mirrorURI, int porOffset) throws Exception {
+   private static void createServer(String serverName, String connectionName, 
String clusterURI, String mirrorURI, int porOffset, boolean paging) throws 
Exception {
       File serverLocation = getFileServerLocation(serverName);
       deleteDirectory(serverLocation);
 
       HelperCreate cliCreateServer = new HelperCreate();
-      
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
       cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
       cliCreateServer.setClustered(true);
       cliCreateServer.setNoWeb(true);
@@ -115,24 +116,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase 
{
       Assert.assertTrue(brokerXml.exists());
       // Adding redistribution delay to broker configuration
       Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
ClusteredMirrorSoakTest.java --> \n"));
+      if (paging) {
+         Assert.assertTrue(FileUtil.findReplace(brokerXml, 
"<max-size-messages>-1</max-size-messages>", 
"<max-size-messages>1</max-size-messages>"));
+      }
    }
 
-
-   @Before
-   public void cleanupServers() {
-      cleanupData(DC1_NODE_A);
-      cleanupData(DC1_NODE_B);
-      cleanupData(DC2_NODE_A);
-      cleanupData(DC2_NODE_B);
-   }
-
-
-   @BeforeClass
-   public static void createServers() throws Exception {
-      createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
-      createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1);
-      createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2);
-      createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3);
+   public static void createRealServers(boolean paging) throws Exception {
+      createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0, 
paging);
+      createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1, 
paging);
+      createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2, 
paging);
+      createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3, 
paging);
    }
 
    private void startServers() throws Exception {
@@ -147,8 +140,147 @@ public class ClusteredMirrorSoakTest extends SoakTestBase 
{
       ServerUtil.waitForServerToStart(3, 10_000);
    }
 
+   @Test
+   public void testAvoidReflections() throws Exception {
+      createRealServers(true);
+
+      String internalQueue = "INTERNAL_QUEUE";
+
+      ActiveMQServer tempServer = createServer(true);
+      
tempServer.getConfiguration().setBindingsDirectory(getServerLocation(DC1_NODE_A)
 + "/data/bindings");
+      
tempServer.getConfiguration().setJournalDirectory(getServerLocation(DC1_NODE_A) 
+ "/data/journal");
+      tempServer.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+      tempServer.start();
+      tempServer.addAddressInfo(new 
AddressInfo(internalQueue).addRoutingType(RoutingType.ANYCAST).setInternal(true));
+      tempServer.createQueue(new 
QueueConfiguration(internalQueue).setDurable(true).setRoutingType(RoutingType.ANYCAST).setInternal(true).setAddress(internalQueue));
+      tempServer.stop();
+
+      startServers();
+
+      SimpleManagement simpleManagementDC1A = new 
SimpleManagement(DC1_NODEA_URI, null, null);
+      SimpleManagement simpleManagementDC2A = new 
SimpleManagement(DC2_NODEA_URI, null, null);
+      SimpleManagement simpleManagementDC1B = new 
SimpleManagement(DC1_NODEA_URI, null, null);
+      SimpleManagement simpleManagementDC2B = new 
SimpleManagement(DC2_NODEB_URI, null, null);
+
+      String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+      String queueName = "myQueue";
+      String topicName = "order";
+
+      for (int i = 0; i < 5; i++) {
+         logger.info("DC1A={}", 
simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
+         logger.info("DC1B={}", 
simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
+         logger.info("DC2A={}", 
simpleManagementDC2A.getMessageAddedOnQueue(snfQueue));
+         logger.info("DC2B={}", 
simpleManagementDC2B.getMessageAddedOnQueue(snfQueue));
+
+         // no load generated.. just initial queues should have been sent
+         
Assert.assertTrue(simpleManagementDC1A.getMessageAddedOnQueue(snfQueue) < 20);
+         
Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20);
+         
Assert.assertTrue(simpleManagementDC1B.getMessageAddedOnQueue(snfQueue) < 20);
+         
Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20);
+         Thread.sleep(100);
+      }
+
+      Assert.assertEquals(0, 
simpleManagementDC2A.getMessageCountOnQueue(queueName));
+      Assert.assertEquals(0, 
simpleManagementDC1A.getMessageCountOnQueue(internalQueue));
+      try {
+         simpleManagementDC2A.getMessageCountOnQueue(internalQueue);
+         Assert.fail("Exception expected");
+      } catch (Exception expected) {
+      }
+
+      ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
+
+      int numberOfMessages = 1_000;
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         connection.setClientID("conn1");
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(topicName);
+         MessageConsumer con = session.createDurableConsumer(topic, "hello1");
+         MessageConsumer con2 = session.createDurableConsumer(topic, "hello2");
+
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < numberOfMessages; i++) {
+            if (i % 100 == 0) {
+               logger.info("Sent topic {}", i);
+            }
+            producer.send(session.createTextMessage("hello " + i));
+         }
+         session.commit();
+
+      }
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName);
+
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < numberOfMessages; i++) {
+            if (i % 100 == 0) {
+               logger.info("Sent queue {}", i);
+            }
+            producer.send(session.createTextMessage("hello " + i));
+         }
+         session.commit();
+      }
+
+      Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
+      Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
+      Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
+      Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
+      Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
+      Wait.assertEquals(numberOfMessages, () -> 
simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         connection.setClientID("conn1");
+         connection.start();
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(topicName);
+         Queue queue = session.createQueue(queueName);
+         MessageConsumer[] consumers = new MessageConsumer[] 
{session.createDurableSubscriber(topic, "hello1"), 
session.createDurableSubscriber(topic, "hello2"), 
session.createConsumer(queue)};
+
+         for (MessageConsumer c : consumers) {
+            for (int i = 0; i < numberOfMessages; i++) {
+               Assert.assertNotNull(c.receive(5000));
+               if (i % 100 == 0) {
+                  session.commit();
+               }
+            }
+            session.commit();
+         }
+      }
+
+      Wait.assertEquals(0, () -> 
simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
+
+      long countDC1A = simpleManagementDC1A.getMessageAddedOnQueue(snfQueue);
+      long countDC1B = simpleManagementDC1B.getMessageAddedOnQueue(snfQueue);
+
+      for (int i = 0; i < 10; i++) {
+         // DC1 should be quiet and nothing moving out of it
+         Assert.assertEquals(countDC1A, 
simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
+         Assert.assertEquals(countDC1B, 
simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
+
+         // DC2 is totally passive, nothing should have been generated
+         
Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20);
+         
Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20);
+         // we take intervals, allowing to make sure it doesn't grow
+         Thread.sleep(100);
+         logger.info("DC1A={}", 
simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
+         logger.info("DC1B={}", 
simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
+         logger.info("DC2A={}", 
simpleManagementDC2A.getMessageAddedOnQueue(snfQueue));
+         logger.info("DC2B={}", 
simpleManagementDC2B.getMessageAddedOnQueue(snfQueue));
+      }
+   }
+
    @Test
    public void testSimpleQueue() throws Exception {
+      createRealServers(false);
       startServers();
 
       final int numberOfMessages = 200;
@@ -303,6 +435,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       runAfter(executorService::shutdownNow);
 
+      createRealServers(false);
       startServers();
 
       String queueName = "testqueue" + RandomUtil.randomString();
@@ -370,6 +503,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
 
    @Test
    public void testMirroredTopics() throws Exception {
+      createRealServers(false);
       startServers();
 
       final int numberOfMessages = 200;
@@ -389,8 +523,8 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
       SimpleManagement simpleManagementDC1B = new 
SimpleManagement(DC1_NODEB_URI, null, null);
       SimpleManagement simpleManagementDC2B = new 
SimpleManagement(DC2_NODEB_URI, null, null);
 
-      consume(connectionFactoryDC1A, clientIDA, subscriptionID,  0, 0, false);
-      consume(connectionFactoryDC1B, clientIDB, subscriptionID,  0, 0, false);
+      consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false);
+      consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 0, false);
 
       try (Connection connection = connectionFactoryDC1B.createConnection()) {
          Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
@@ -414,7 +548,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
       }
 
       logger.debug("Consuming from DC1B");
-      consume(connectionFactoryDC1B, clientIDB, subscriptionID,  0, 
numberOfMessages / 2, false);
+      consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 
numberOfMessages / 2, false);
 
       processDC2_node_B.destroyForcibly();
       processDC2_node_B.waitFor();
@@ -425,12 +559,12 @@ public class ClusteredMirrorSoakTest extends SoakTestBase 
{
 
       logger.debug("Consuming from DC2B with {}", 
simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"));
 
-      consume(connectionFactoryDC2B, clientIDB, subscriptionID,  
numberOfMessages / 2, numberOfMessages / 2, true);
+      consume(connectionFactoryDC2B, clientIDB, subscriptionID, 
numberOfMessages / 2, numberOfMessages / 2, true);
 
       Wait.assertEquals(0, () -> 
simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"), 10000);
 
       Wait.assertEquals(0, () -> 
simpleManagementDC1B.getMessageCountOnQueue("nodeB.my-order"), 10000);
-      consume(connectionFactoryDC1B, clientIDB, subscriptionID,  
numberOfMessages, 0, true);
+      consume(connectionFactoryDC1B, clientIDB, subscriptionID, 
numberOfMessages, 0, true);
       logger.debug("DC1B nodeB.my-order=0");
    }
 

Reply via email to