This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 063968bb4f ARTEMIS-4677 Validating AutoCreate with Mirror and
Clustering
063968bb4f is described below
commit 063968bb4fe2c8563a3dc028e58873da03f1e32e
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Mar 7 13:29:46 2024 -0500
ARTEMIS-4677 Validating AutoCreate with Mirror and Clustering
There is no semantic change on this commit.
I wrote a test to validate a scenario and this is to keep the test in the
codebase.
---
.../activemq/artemis/utils/RealServerTestBase.java | 17 +++
...cSoakTest.java => ClusteredMirrorSoakTest.java} | 130 ++++++++++++++++++++-
.../mirror/InterruptedLargeMessageTest.java | 12 +-
3 files changed, 146 insertions(+), 13 deletions(-)
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
index b31981f018..81858c11ef 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
+import java.nio.file.Files;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
@@ -61,6 +62,22 @@ public class RealServerTestBase extends ActiveMQTestBase {
public static final String basedir = System.getProperty("basedir");
+ /**
+ * Search and replace strings on a file
+ *
+ * @param file file to be replaced
+ * @param find string expected to match
+ * @param replace string to be replaced
+ * @return true if the replacement was successful
+ * @throws Exception
+ */
+ public static boolean findReplace(File file, String find, String replace)
throws Exception {
+ String original = Files.readString(file.toPath());
+ String newContent = original.replace(find, replace);
+ Files.writeString(file.toPath(), newContent);
+ return !original.equals(newContent);
+ }
+
@After
public void after() throws Exception {
// close ServerLocators before killing the server otherwise they'll hang
and delay test termination
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MirroredTopicSoakTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
similarity index 71%
rename from
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MirroredTopicSoakTest.java
rename to
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
index d39fbf73ae..fd92be3380 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MirroredTopicSoakTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ClusteredMirrorSoakTest.java
@@ -19,6 +19,7 @@ package
org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -29,11 +30,19 @@ import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
@@ -43,7 +52,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MirroredTopicSoakTest extends SoakTestBase {
+public class ClusteredMirrorSoakTest extends SoakTestBase {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -98,6 +107,11 @@ public class MirroredTopicSoakTest extends SoakTestBase {
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ Assert.assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ Assert.assertTrue(findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
ClusteredMirrorSoakTest.java --> \n"));
}
@BeforeClass
@@ -121,7 +135,7 @@ public class MirroredTopicSoakTest extends SoakTestBase {
}
@Test
- public void testQueue() throws Exception {
+ public void testSimpleQueue() throws Exception {
startServers();
final int numberOfMessages = 200;
@@ -197,6 +211,118 @@ public class MirroredTopicSoakTest extends SoakTestBase {
}
}
+
+ private CountDownLatch startConsumer(Executor executor, ConnectionFactory
factory, String queue, AtomicBoolean running, AtomicInteger errorCount,
AtomicInteger receivedCount) {
+ CountDownLatch done = new CountDownLatch(1);
+
+ executor.execute(() -> {
+ try {
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queue));
+ connection.start();
+ while (running.get()) {
+ Message message = consumer.receive(100);
+ if (message != null) {
+ receivedCount.incrementAndGet();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errorCount.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+
+ });
+
+ return done;
+ }
+
+ private boolean findQueue(SimpleManagement simpleManagement, String queue) {
+ try {
+ simpleManagement.getMessageCountOnQueue(queue);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private void sendMessages(ConnectionFactory factory, String queueName, int
messages, int commitInterval) throws Exception {
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0; i < messages; i++) {
+ TextMessage message;
+ boolean large;
+ if (i % 1 == 2) {
+ message = session.createTextMessage(largeBody);
+ large = true;
+ } else {
+ message = session.createTextMessage(smallBody);
+ large = false;
+ }
+ message.setIntProperty("i", i);
+ message.setBooleanProperty("large", large);
+ producer.send(message);
+ if (i > 0 && i % commitInterval == 0) {
+ logger.debug("commit {}", i);
+ session.commit();
+ }
+ }
+ session.commit();
+ }
+ }
+
+ @Test
+ public void testAutoCreateQueue() throws Exception {
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ runAfter(executorService::shutdownNow);
+
+ startServers();
+
+ String queueName = "queue" + RandomUtil.randomString();
+
+ final int numberOfMessages = 50;
+
+ ConnectionFactory connectionFactoryDC1A =
CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
+ ConnectionFactory connectionFactoryDC2B =
CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI);
+
+ AtomicBoolean runningConsumers = new AtomicBoolean(true);
+ runAfter(() -> runningConsumers.set(false));
+ AtomicInteger errors = new AtomicInteger(0);
+ AtomicInteger receiverCount = new AtomicInteger(0);
+
+ SimpleManagement simpleManagementDC1A = new
SimpleManagement(DC1_NODEA_URI, null, null);
+ SimpleManagement simpleManagementDC1B = new
SimpleManagement(DC1_NODEB_URI, null, null);
+ SimpleManagement simpleManagementDC2A = new
SimpleManagement(DC2_NODEA_URI, null, null);
+ SimpleManagement simpleManagementDC2B = new
SimpleManagement(DC2_NODEB_URI, null, null);
+
+ CountDownLatch doneDC2B = startConsumer(executorService,
connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
+
+ sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
+
+ Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
+
+ Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
+ Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
+
+ Wait.assertEquals(0, () ->
simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
+ Wait.assertEquals(0, () ->
simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
+
+ runningConsumers.set(false);
+
+ Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
+ }
+
@Test
public void testMirroredTopics() throws Exception {
startServers();
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
index 8cffd6126a..88fa0098c0 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
@@ -27,8 +27,6 @@ import javax.jms.TextMessage;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -105,11 +103,6 @@ public class InterruptedLargeMessageTest extends
SoakTestBase {
File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
- Path configPath = new File(getServerLocation(serverName),
"./etc/broker.xml").toPath();
-
- String brokerXML = Files.readString(configPath);
-
- // the SimpleMetricsPlugin needs to be added throught the XML
String insert;
{
StringWriter insertWriter = new StringWriter();
@@ -130,10 +123,7 @@ public class InterruptedLargeMessageTest extends
SoakTestBase {
insert = insertWriter.toString();
}
- brokerXML = brokerXML.replace("</core>", insert);
- Assert.assertTrue(brokerXML.contains("SimpleMetricsPlugin"));
-
- Files.writeString(configPath, brokerXML);
+ Assert.assertTrue(findReplace(new File(getServerLocation(serverName),
"./etc/broker.xml"), "</core>", insert));
}
@BeforeClass