This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 063968bb4f ARTEMIS-4677 Validating AutoCreate with Mirror and 
Clustering
063968bb4f is described below

commit 063968bb4fe2c8563a3dc028e58873da03f1e32e
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Mar 7 13:29:46 2024 -0500

    ARTEMIS-4677 Validating AutoCreate with Mirror and Clustering
    
    There is no semantic change on this commit.
    
    I wrote a test to validate a scenario and this is to keep the test in the 
codebase.
---
 .../activemq/artemis/utils/RealServerTestBase.java |  17 +++
 ...cSoakTest.java => ClusteredMirrorSoakTest.java} | 130 ++++++++++++++++++++-
 .../mirror/InterruptedLargeMessageTest.java        |  12 +-
 3 files changed, 146 insertions(+), 13 deletions(-)

diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
index b31981f018..81858c11ef 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
 import java.net.MalformedURLException;
+import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
@@ -61,6 +62,22 @@ public class RealServerTestBase extends ActiveMQTestBase {
 
    public static final String basedir = System.getProperty("basedir");
 
+   /**
+    * Search and replace strings on a file
+    *
+    * @param file file to be replaced
+    * @param find string expected to match
+    * @param replace string to be replaced
+    * @return true if the replacement was successful
+    * @throws Exception
+    */
+   public static boolean findReplace(File file, String find, String replace) 
throws Exception {
+      String original = Files.readString(file.toPath());
+      String newContent = original.replace(find, replace);
+      Files.writeString(file.toPath(), newContent);
+      return !original.equals(newContent);
+   }
+
    @After
    public void after() throws Exception {
       // close ServerLocators before killing the server otherwise they'll hang 
and delay test termination
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MirroredTopicSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
similarity index 71%
rename from 
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MirroredTopicSoakTest.java
rename to 
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index d39fbf73ae..fd92be3380 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MirroredTopicSoakTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -29,11 +30,19 @@ import java.io.File;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.management.SimpleManagement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.util.ServerUtil;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
@@ -43,7 +52,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MirroredTopicSoakTest extends SoakTestBase {
+public class ClusteredMirrorSoakTest extends SoakTestBase {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -98,6 +107,11 @@ public class MirroredTopicSoakTest extends SoakTestBase {
       brokerProperties.put("largeMessageSync", "false");
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
       saveProperties(brokerProperties, brokerPropertiesFile);
+
+      File brokerXml = new File(serverLocation, "/etc/broker.xml");
+      Assert.assertTrue(brokerXml.exists());
+      // Adding redistribution delay to broker configuration
+      Assert.assertTrue(findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
ClusteredMirrorSoakTest.java --> \n"));
    }
 
    @BeforeClass
@@ -121,7 +135,7 @@ public class MirroredTopicSoakTest extends SoakTestBase {
    }
 
    @Test
-   public void testQueue() throws Exception {
+   public void testSimpleQueue() throws Exception {
       startServers();
 
       final int numberOfMessages = 200;
@@ -197,6 +211,118 @@ public class MirroredTopicSoakTest extends SoakTestBase {
       }
    }
 
+
+   private CountDownLatch startConsumer(Executor executor, ConnectionFactory 
factory, String queue, AtomicBoolean running, AtomicInteger errorCount, 
AtomicInteger receivedCount) {
+      CountDownLatch done = new CountDownLatch(1);
+
+      executor.execute(() -> {
+         try {
+            try (Connection connection = factory.createConnection()) {
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer consumer = 
session.createConsumer(session.createQueue(queue));
+               connection.start();
+               while (running.get()) {
+                  Message message = consumer.receive(100);
+                  if (message != null) {
+                     receivedCount.incrementAndGet();
+                  }
+               }
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            errorCount.incrementAndGet();
+         } finally {
+            done.countDown();
+         }
+
+      });
+
+      return done;
+   }
+
+   private boolean findQueue(SimpleManagement simpleManagement, String queue) {
+      try {
+         simpleManagement.getMessageCountOnQueue(queue);
+         return true;
+      } catch (Exception e) {
+         return false;
+      }
+   }
+
+   private void sendMessages(ConnectionFactory factory, String queueName, int 
messages, int commitInterval) throws Exception {
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+
+         for (int i = 0; i < messages; i++) {
+            TextMessage message;
+            boolean large;
+            if (i % 1 == 2) {
+               message = session.createTextMessage(largeBody);
+               large = true;
+            } else {
+               message = session.createTextMessage(smallBody);
+               large = false;
+            }
+            message.setIntProperty("i", i);
+            message.setBooleanProperty("large", large);
+            producer.send(message);
+            if (i > 0 && i % commitInterval == 0) {
+               logger.debug("commit {}", i);
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+   }
+
+   @Test
+   public void testAutoCreateQueue() throws Exception {
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+
+      startServers();
+
+      String queueName = "queue" + RandomUtil.randomString();
+
+      final int numberOfMessages = 50;
+
+      ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
+      ConnectionFactory connectionFactoryDC2B = 
CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI);
+
+      AtomicBoolean runningConsumers = new AtomicBoolean(true);
+      runAfter(() -> runningConsumers.set(false));
+      AtomicInteger errors = new AtomicInteger(0);
+      AtomicInteger receiverCount = new AtomicInteger(0);
+
+      SimpleManagement simpleManagementDC1A = new 
SimpleManagement(DC1_NODEA_URI, null, null);
+      SimpleManagement simpleManagementDC1B = new 
SimpleManagement(DC1_NODEB_URI, null, null);
+      SimpleManagement simpleManagementDC2A = new 
SimpleManagement(DC2_NODEA_URI, null, null);
+      SimpleManagement simpleManagementDC2B = new 
SimpleManagement(DC2_NODEB_URI, null, null);
+
+      CountDownLatch doneDC2B = startConsumer(executorService, 
connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
+
+      sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
+
+      Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
+
+      Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
+      Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
+      Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
+      Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
+
+      Wait.assertEquals(0, () -> 
simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
+      Wait.assertEquals(0, () -> 
simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
+
+      runningConsumers.set(false);
+
+      Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
+   }
+
    @Test
    public void testMirroredTopics() throws Exception {
       startServers();
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
index 8cffd6126a..88fa0098c0 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
@@ -27,8 +27,6 @@ import javax.jms.TextMessage;
 import java.io.File;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -105,11 +103,6 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
       saveProperties(brokerProperties, brokerPropertiesFile);
 
-      Path configPath = new File(getServerLocation(serverName), 
"./etc/broker.xml").toPath();
-
-      String brokerXML = Files.readString(configPath);
-
-      // the SimpleMetricsPlugin needs to be added throught the XML
       String insert;
       {
          StringWriter insertWriter = new StringWriter();
@@ -130,10 +123,7 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
          insert = insertWriter.toString();
       }
 
-      brokerXML = brokerXML.replace("</core>", insert);
-      Assert.assertTrue(brokerXML.contains("SimpleMetricsPlugin"));
-
-      Files.writeString(configPath, brokerXML);
+      Assert.assertTrue(findReplace(new File(getServerLocation(serverName), 
"./etc/broker.xml"), "</core>", insert));
    }
 
    @BeforeClass

Reply via email to