This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new cbbaa7a8e0 NO-JIRA Mirror and Divert Smoke Test
cbbaa7a8e0 is described below
commit cbbaa7a8e0ff37712c422544226fe69ba0ee076f
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Jul 30 20:11:13 2024 -0400
NO-JIRA Mirror and Divert Smoke Test
I was doing some verification with Diverts and Mirror in the form of a
smoke test.
no issue found but I'm keeping the test.
---
.../brokerConnection/DivertQueueMirrorTest.java | 202 +++++++++++++++++++++
1 file changed, 202 insertions(+)
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java
new file mode 100644
index 0000000000..6a6428d0f8
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/DivertQueueMirrorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DivertQueueMirrorTest extends SmokeTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static String body;
+
+ static {
+ StringWriter writer = new StringWriter();
+ while (writer.getBuffer().length() < 20 * 1024) {
+ writer.append("This is a string ..... ");
+ }
+ body = writer.toString();
+ }
+
+ private static final String CREATE_QUEUE = "outQueue1,outQueue2";
+
+ public static final String DC1_NODE_A = "DivertQueueMirrorTest/DC1";
+ public static final String DC2_NODE_A = "DivertQueueMirrorTest/DC2";
+
+ private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+ Process processDC1_node_A;
+ Process processDC2_node_A;
+
+ private static String DC1_NODEA_URI = "tcp://localhost:61616";
+ private static String DC2_NODEA_URI = "tcp://localhost:61618";
+
+ private static void createServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int porOffset) throws Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+ cliCreateServer.setClustered(false);
+ cliCreateServer.setNoWeb(false);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE_A);
+ cliCreateServer.addArgs("--queues", CREATE_QUEUE);
+ cliCreateServer.addArgs("--java-memory", "512M");
+ cliCreateServer.setPortOffset(porOffset);
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("divertConfigurations.myDivert.routingName",
"inputQueue");
+ brokerProperties.put("divertConfigurations.myDivert.address",
"inputQueue");
+ brokerProperties.put("divertConfigurations.myDivert.forwardingAddress",
"outQueue1");
+ brokerProperties.put("divertConfigurations.myDivert.exclusive", "true");
+
+ brokerProperties.put("divertConfigurations.myDivert2.routingName",
"inputQueue");
+ brokerProperties.put("divertConfigurations.myDivert2.address",
"inputQueue");
+ brokerProperties.put("divertConfigurations.myDivert2.forwardingAddress",
"outQueue2");
+ brokerProperties.put("divertConfigurations.myDivert2.exclusive", "true");
+
+
+ brokerProperties.put("AMQPConnections." + connectionName + ".uri",
mirrorURI);
+ brokerProperties.put("AMQPConnections." + connectionName +
".retryInterval", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+ brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeBytes",
Integer.toString(100 * 1024 * 1024));
+ brokerProperties.put("addressSettings.#.addressFullMessagePolicy",
"PAGING");
+
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+ }
+
+ @BeforeAll
+ public static void createServers() throws Exception {
+ createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
+ createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
+ }
+
+ @BeforeEach
+ public void cleanupServers() {
+ cleanupData(DC1_NODE_A);
+ cleanupData(DC2_NODE_A);
+ }
+
+ @Test
+ @Timeout(value = 240_000L, unit = TimeUnit.MILLISECONDS)
+ public void testDivertAndMirror() throws Exception {
+ String protocol = "AMQP"; // no need to run this test using multiple
protocols. this is about validating paging works correctly
+
+ processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new
File(getServerLocation(DC1_NODE_A), "broker.properties"));
+ processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new
File(getServerLocation(DC2_NODE_A), "broker.properties"));
+ ServerUtil.waitForServerToStart(0, 10_000);
+ ServerUtil.waitForServerToStart(2, 10_000);
+
+ final int numberOfMessages = 500;
+ final int commitInterval = 100;
+
+ ConnectionFactory connectionFactoryDC1A =
CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
+ ConnectionFactory connectionFactoryDC2A =
CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI);
+
+ SimpleManagement simpleManagementDC1A = new
SimpleManagement(DC1_NODEA_URI, null, null);
+ SimpleManagement simpleManagementDC2A = new
SimpleManagement(DC2_NODEA_URI, null, null);
+ try (Connection connection = connectionFactoryDC1A.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ connection.start();
+ Queue queue = session.createQueue("inputQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ connection.start();
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send(session.createTextMessage(body));
+ if (i > 0 && i % commitInterval == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ }
+
+ Wait.assertEquals((long)numberOfMessages, () ->
getMessageCount(simpleManagementDC1A, "outQueue1"), 60_000, 500);
+ Wait.assertEquals((long)numberOfMessages, () ->
getMessageCount(simpleManagementDC1A, "outQueue2"), 60_000, 500);
+ Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A,
"inputQueue"), 60_000, 500);
+
+ Wait.assertEquals((long)numberOfMessages, () ->
getMessageCount(simpleManagementDC2A, "outQueue1"), 60_000, 500);
+ Wait.assertEquals((long)numberOfMessages, () ->
getMessageCount(simpleManagementDC2A, "outQueue2"), 60_000, 500);
+ Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC2A,
"inputQueue"), 60_000, 500);
+
+ try (Connection connection = connectionFactoryDC2A.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ for (int q = 1; q <= 2; q++) {
+ Queue queue = session.createQueue("outQueue" + q);
+
+ try (MessageConsumer consumer = session.createConsumer(queue)) {
+ logger.info("Consuming from queue {}", queue);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assertions.assertNotNull(message, "expecting message on
queue " + queue);
+ if (i > 0 && i % commitInterval == 0) {
+ logger.info("Received {}, queue={}", i, queue);
+ session.commit();
+ }
+ }
+ session.commit();
+ }
+ }
+ }
+
+ Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A,
SNF_QUEUE), 240_000, 500);
+ Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC2A,
SNF_QUEUE), 240_000, 500);
+ Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A,
"outQueue1"), 60_000, 500);
+ Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A,
"outQueue2"), 60_000, 500);
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact