This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 30c5e63e0b61d8a5bd1dcd2ade06bc7c96bb5fdb Author: Clebert Suconic <[email protected]> AuthorDate: Wed Sep 25 10:13:47 2024 -0400 ARTEMIS-5066 Disable Divert on Mirror Target --- .../connect/mirror/AMQPMirrorControllerTarget.java | 2 +- .../artemis/core/postoffice/impl/BindingsImpl.java | 8 +- .../artemis/core/server/RoutingContext.java | 4 + .../artemis/core/server/impl/DivertImpl.java | 11 +- .../core/server/impl/RoutingContextImpl.java | 12 + .../mirror/DivertSoakMirrorTest.java | 392 +++++++++++++++++++++ .../mirror/ReplicatedBothNodesMirrorTest.java | 19 +- .../test/resources/DivertSoakMirrorTest-divert.txt | 25 ++ 8 files changed, 462 insertions(+), 11 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 3517e0cdbd..27177f6ab3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -547,7 +547,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement routingContext.setTransaction(transaction); duplicateIDCache.addToCache(duplicateIDBytes, transaction); - routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY); + routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY).disableDivert(); if (targetQueues != null) { targetQueuesRouting(message, routingContext, targetQueues); server.getPostOffice().processRoute(message, routingContext, false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index ec309f8a0f..95935e9959 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -368,7 +368,9 @@ public final class BindingsImpl implements Bindings { } final Filter filter = binding.getFilter(); if (filter == null || filter.match(message)) { - binding.getBindable().route(message, context); + if (!(binding instanceof DivertBinding && context.isDivertDisabled())) { + binding.getBindable().route(message, context); + } routed = true; } } @@ -415,7 +417,9 @@ public final class BindingsImpl implements Bindings { } if (nextBinding != null) { - nextBinding.route(message, context); + if (!(context.isDivertDisabled() && nextBinding instanceof DivertBinding)) { + nextBinding.route(message, context); + } } }); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index 73d7ebf381..000ede4d37 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -41,6 +41,10 @@ public interface RoutingContext { MirrorOption getMirrorOption(); + boolean isDivertDisabled(); + + void disableDivert(); + void forEachDurable(Consumer<Queue> consumer); RoutingContext setMirrorOption(MirrorOption option); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index a279e2d123..7132bb6545 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -92,8 +92,10 @@ public class DivertImpl implements Divert { @Override public void route(final Message message, final RoutingContext context) throws Exception { - // We must make a copy of the message, otherwise things like returning credits to the page won't work - // properly on ack, since the original address will be overwritten + + if (logger.isTraceEnabled()) { + logger.trace("Routing message {} through context {}", message, context); + } for (SimpleString forwardAddress : forwardAddresses) { if (logger.isTraceEnabled()) { @@ -107,8 +109,13 @@ public class DivertImpl implements Divert { // Shouldn't copy if it's not routed anywhere else if (!forwardAddress.equals(context.getAddress(message))) { long id = storageManager.generateID(); + + // We must make a copy of the message, otherwise things like returning credits to the page won't work + // properly on ack, since the original address will be overwritten copy = message.copy(id); + logger.trace("Divert {} copied message {}", uniqueName, copy); + // This will set the original MessageId, and the original address copy.referenceOriginalMessage(message, this.getUniqueName()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index d4bcfcebe5..103057498f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -61,6 +61,8 @@ public class RoutingContextImpl implements RoutingContext { Boolean internalOnly = null; + boolean divertDisabled = false; + volatile int version; MirrorOption mirrorOption = MirrorOption.enabled; @@ -89,6 +91,16 @@ public class RoutingContextImpl implements RoutingContext { return mirrorOption; } + @Override + public boolean isDivertDisabled() { + return divertDisabled; + } + + @Override + public void disableDivert() { + divertDisabled = true; + } + @Override public boolean isMirrorDisabled() { return mirrorOption == MirrorOption.disabled; diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/DivertSoakMirrorTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/DivertSoakMirrorTest.java new file mode 100644 index 0000000000..e9a6f18569 --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/DivertSoakMirrorTest.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.util.Enumeration; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.TestParameters; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class DivertSoakMirrorTest extends SoakTestBase { + + private static final String TEST_NAME = "DIVERT_MIRROR"; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; + + private static String DIVERT_CONFIGURATION_FILE_LOCATION = "DivertSoakMirrorTest-divert.txt"; + + // Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's + private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "trace")); + private static final boolean REUSE_SERVERS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS", "false")); + + /* + * Time each consumer takes to process a message received to allow some messages accumulating. + * This sleep happens right before the commit. + */ + private static final String QUEUE_NAME_LIST = "queueTest,Div,Div.0,Div.1,Div.2"; + private static final String QUEUE_NAME = "queueTest"; + + public static final String DC1_NODE = "DivertSoakMirrorTest/DC1"; + public static final String DC2_NODE = "DivertSoakMirrorTest/DC2"; + + volatile Process processDC1; + volatile Process processDC2; + + @AfterEach + public void destroyServers() throws Exception { + if (processDC1 != null) { + processDC1.destroyForcibly(); + processDC1.waitFor(1, TimeUnit.MINUTES); + processDC1 = null; + } + if (processDC2 != null) { + processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; + } + } + + private static final String DC1_IP = "localhost:61616"; + private static final String DC2_IP = "localhost:61618"; + + private static String uri(String ip) { + return "tcp://" + ip; + } + + private void startServers() throws Exception { + processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties")); + processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties")); + ServerUtil.waitForServerToStart(2, 10_000); + ServerUtil.waitForServerToStart(0, 10_000); + + // Waiting both nodes to be connected + //Wait.assertTrue(() -> FileUtil.find(new File(getFileServerLocation(DC1_NODE), "log/artemis.log"), l -> l.contains("AMQ111003")), 5000, 100); + //Wait.assertTrue(() -> FileUtil.find(new File(getFileServerLocation(DC2_NODE), "log/artemis.log"), l -> l.contains("AMQ111003")), 5000, 100); + } + + private static void createMirroredServer(boolean paging, + String serverName, + String connectionName, + String mirrorURI, + int portOffset, + boolean exclusiveDivert) throws Exception { + File serverLocation = getFileServerLocation(serverName); + if (REUSE_SERVERS && serverLocation.exists()) { + deleteDirectory(new File(serverLocation, "data")); + return; + } + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.addArgs("--queues", QUEUE_NAME_LIST); + cliCreateServer.setPortOffset(portOffset); + cliCreateServer.setClustered(false); + + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("messageExpiryScanPeriod", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI); + brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + + if (paging) { + brokerProperties.put("addressSettings.#.maxSizeMessages", "50"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); + brokerProperties.put("mirrorPageTransaction", "true"); + } + + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + + assertTrue(brokerXml.exists()); + assertTrue(FileUtil.findReplace(brokerXml, "<page-size-bytes>10M</page-size-bytes>", "<page-size-bytes>100K</page-size-bytes>")); + assertTrue(FileUtil.findReplace(brokerXml, "amqpDuplicateDetection=true;", "amqpDuplicateDetection=true;ackManagerFlushTimeout=" + TimeUnit.MINUTES.toMillis(10) + ";")); + + installDivert(brokerXml, exclusiveDivert); + + if (TRACE_LOGS) { + replaceLogs(serverLocation); + } + + } + + private static void installDivert(File brokerXml, boolean exclusive) throws Exception { + String divertConfig = FileUtil.readFile(DivertSoakMirrorTest.class.getClassLoader().getResourceAsStream(DIVERT_CONFIGURATION_FILE_LOCATION)); + assertNotNull(divertConfig); + divertConfig = divertConfig.replaceAll("BOOLEAN_VALUE", String.valueOf(exclusive)); + assertTrue(FileUtil.findReplace(brokerXml, "</acceptors>", "</acceptors>\n" + divertConfig)); + } + + private static void replaceLogs(File serverLocation) throws Exception { + File log4j = new File(serverLocation, "/etc/log4j2.properties"); + assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + "logger.divert.name=org.apache.activemq.artemis.core.server.impl.DivertImpl\n" + "logger.divert.level=TRACE\n" + "logger.target.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" + "logger.target.level=TRACE\n" + "appender.console.filter.threshold.type = ThresholdFilter\n" + "appender.console.filter.threshold.level = INFO")); + } + + private static void replaceLogsOnServer2(File serverLocation) throws Exception { + File log4j = new File(serverLocation, "/etc/log4j2.properties"); + assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + "logger.divert.name=org.apache.activemq.artemis.core.server.impl.DivertImpl\n" + "logger.divert.level=TRACE\n" + "logger.target.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource\n" + "logger.target.level=DEBUG\n" + "appender.console.filter.threshold.type = ThresholdFilter\n" + "appender.console.filter.threshold.level = TRACE")); + } + + public static void createRealServers(boolean paging, boolean divert, boolean exclusiveDivert) throws Exception { + createMirroredServer(paging, DC1_NODE, "mirror", uri(DC2_IP), 0, exclusiveDivert); + createMirroredServer(paging, DC2_NODE, "mirror", uri(DC1_IP), 2, exclusiveDivert); + } + + @Test + public void testDivertsExclusive() throws Exception { + testDiverts(true, false); + } + + @Test + public void testDivertsExclusiveNoConsume() throws Exception { + testDiverts(true, false, false); + } + + @Test + public void testDivertsNonExclusive() throws Exception { + testDiverts(false, false); + } + + @Test + public void testDivertsExclusiveSwitchOver() throws Exception { + testDiverts(true, true); + } + + @Test + public void testDivertsNonExclusiveSwitchOver() throws Exception { + testDiverts(false, true); + } + + private void testDiverts(boolean exclusive, boolean switchOver) throws Exception { + testDiverts(exclusive, switchOver, true); + } + + private void testDiverts(boolean exclusive, boolean switchOver, boolean consume) throws Exception { + String protocol = "OPENWIRE"; + createRealServers(false, true, exclusive); + + SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null); + SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null); + + //replaceLogsOnServer2(getFileServerLocation(DC2_NODE)); + startServers(); + + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, uri(DC1_IP)); + ConnectionFactory connectionFactoryConsume; + + if (switchOver) { + logger.info("Switching over consumption to DC2"); + connectionFactoryConsume = CFUtil.createConnectionFactory(protocol, uri(DC2_IP)); + } else { + connectionFactoryConsume = CFUtil.createConnectionFactory(protocol, uri(DC1_IP)); + } + + int numberOfDiverts = 3; + long messagesPerDivert = 100; + long messages = numberOfDiverts * messagesPerDivert; + + try (Connection connection = connectionFactoryDC1A.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = session.createProducer(session.createQueue("Div")); + + for (int i = 0; i < messages; i++) { + TextMessage message = session.createTextMessage("i = " + i); + message.setIntProperty("div", (i % numberOfDiverts)); + message.setIntProperty("i", i); + producer.setPriority(RandomUtil.randomInterval(0, 9)); + producer.send(message); + } + session.commit(); + } + + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, SNF_QUEUE), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, SNF_QUEUE), 5000, 100); + + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, QUEUE_NAME), 5000, 100); + Wait.assertEquals(exclusive ? 0L : messagesPerDivert * numberOfDiverts, () -> getMessageCount(managementDC1, "Div"), 5000, 100); + Wait.assertEquals(messagesPerDivert, () -> getMessageCount(managementDC1, "Div.0"), 5000, 100); + Wait.assertEquals(messagesPerDivert, () -> getMessageCount(managementDC1, "Div.1"), 5000, 100); + Wait.assertEquals(messagesPerDivert, () -> getMessageCount(managementDC1, "Div.2"), 5000, 100); + + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, QUEUE_NAME), 5000, 100); + Wait.assertEquals(exclusive ? 0L : messagesPerDivert * numberOfDiverts, () -> getMessageCount(managementDC2, "Div"), 5000, 100); + Wait.assertEquals(messagesPerDivert, () -> getMessageCount(managementDC2, "Div.0"), 5000, 100); + Wait.assertEquals(messagesPerDivert, () -> getMessageCount(managementDC2, "Div.1"), 5000, 100); + Wait.assertEquals(messagesPerDivert, () -> getMessageCount(managementDC2, "Div.2"), 5000, 100); + + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE)); + + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE)); + + if (consume) { + try (Connection connection = connectionFactoryConsume.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + connection.start(); + + // the main divert.. as the diverts are non exclusive + try (MessageConsumer consumer = session.createConsumer(session.createQueue("Div"))) { + if (!exclusive) { + for (int i = 0; i < messagesPerDivert * numberOfDiverts; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + assertNotNull(message); + if (i % 100 == 0) { + logger.info("processed {}", i); + } + if (i == 0) { + checkProperties(connection, message); + } + session.commit(); + } + } + assertNull(consumer.receiveNoWait()); + } + session.commit(); + } + + for (int div = 0; div <= 2; div++) { + for (int i = 0; i < messagesPerDivert; i++) { + try (Connection connection = connectionFactoryConsume.createConnection()) { + try (Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + try (MessageConsumer consumer = session.createConsumer(session.createQueue("Div." + div))) { + connection.start(); + TextMessage message = (TextMessage) consumer.receive(5000); + if (i % 10 == 0) { + logger.info("i={} on div {}", i, div); + } + MessageProducer testProducer = session.createProducer(session.createQueue(QUEUE_NAME)); + testProducer.send(message); + assertNotNull(message); + session.commit(); + } + } + } + } + + try (Connection connection = connectionFactoryConsume.createConnection()) { + try (Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + try (MessageConsumer consumer = session.createConsumer(session.createQueue("Div." + div))) { + assertNull(consumer.receiveNoWait(), "div " + div + " received a message"); + } + session.commit(); + } + } + } + + try (Connection connection = connectionFactoryConsume.createConnection()) { + try (Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + try (MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME))) { + connection.start(); + for (int i = 0; i < messagesPerDivert * numberOfDiverts; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + assertNotNull(message); + if (i == 0) { + checkProperties(connection, message); + } + session.commit(); + } + assertNull(consumer.receiveNoWait()); + session.commit(); + } + } + } + + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, SNF_QUEUE), 30_000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, SNF_QUEUE), 30_000, 100); + + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, QUEUE_NAME), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div"), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div.0"), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div.1"), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div.2"), 5000, 100); + + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, QUEUE_NAME), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div"), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div.0"), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div.1"), 5000, 100); + Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div.2"), 5000, 100); + } + + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE)); + } + + private void checkProperties(Connection connection, javax.jms.Message message) throws Exception { + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + TemporaryQueue temporaryQueue = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(temporaryQueue); + producer.send(message); + connection.start(); + MessageConsumer consumer = session.createConsumer(temporaryQueue); + javax.jms.Message receivedMessage = consumer.receive(5000); + assertNotNull(receivedMessage); + + // The cleanup for x-opt happens on server's side. + // we may receive if coming directly from a mirrored queue, + // however we should cleanup on the next send to avoid invalid IDs on the server + Enumeration propertyNames = receivedMessage.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String property = String.valueOf(propertyNames.nextElement()); + assertFalse(property.startsWith("x-opt")); + } + } + } + +} \ No newline at end of file diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java index 147b68251e..dd0ad73348 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java @@ -67,6 +67,9 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; + // Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false")); private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 200); @@ -79,6 +82,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { * Time each consumer takes to process a message received to allow some messages accumulating. * This sleep happens right before the commit. */ + private static final String QUEUE_NAME_LIST = "queueTest,Div,Div.0,Div.1,Div.2"; private static final String QUEUE_NAME = "queueTest"; private static String body; @@ -180,7 +184,6 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { cliCreateServer.createServer(); Properties brokerProperties = new Properties(); - brokerProperties.put("messageExpiryScanPeriod", "1000"); brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI); brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000"); brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); @@ -270,7 +273,6 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { File brokerPropertiesFile = new File(serverLocation, "broker.properties"); saveProperties(brokerProperties, brokerPropertiesFile); - File brokerXml = new File(serverLocation, "/etc/broker.xml"); assertTrue(brokerXml.exists()); assertTrue(FileUtil.findReplace(brokerXml, "amqpDuplicateDetection=true;", "amqpDuplicateDetection=true;ackManagerFlushTimeout=" + TimeUnit.MINUTES.toMillis(10) + ";")); @@ -376,7 +378,6 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { final int failbackAt = 700; final int lastKillAt = 1200; final int totalMessages = 1_800; - String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; try (Connection connection = connectionFactoryDC1A.createConnection()) { connection.start(); @@ -431,7 +432,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { session.commit(); } - Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue)); + Wait.assertEquals(0, () -> getMessageCount(managementDC1, SNF_QUEUE)); Wait.assertEquals(oddSend, () -> getMessageCount(managementDC1, QUEUE_NAME)); Wait.assertEquals(oddSend, () -> getMessageCount(managementDC2Backup, QUEUE_NAME)); @@ -451,10 +452,16 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { session.commit(); } - Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue)); - Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, snfQueue)); + Wait.assertEquals(0, () -> getMessageCount(managementDC1, SNF_QUEUE)); + Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, SNF_QUEUE)); Wait.assertEquals(0, () -> getMessageCount(managementDC1, QUEUE_NAME)); Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, QUEUE_NAME)); + + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_REPLICA_NODE)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_REPLICA_NODE)); + } @Test diff --git a/tests/soak-tests/src/test/resources/DivertSoakMirrorTest-divert.txt b/tests/soak-tests/src/test/resources/DivertSoakMirrorTest-divert.txt new file mode 100644 index 0000000000..d1c042403f --- /dev/null +++ b/tests/soak-tests/src/test/resources/DivertSoakMirrorTest-divert.txt @@ -0,0 +1,25 @@ + <diverts> + <divert name="Test.Div0"> + <address>Div</address> + <forwarding-address>Div.0</forwarding-address> + <filter string="div=0"/> + <exclusive>BOOLEAN_VALUE</exclusive> + </divert> + <divert name="Test.Div1"> + <address>Div</address> + <forwarding-address>Div.1</forwarding-address> + <filter string="div=1"/> + <exclusive>BOOLEAN_VALUE</exclusive> + </divert> + <divert name="Test.Div2"> + <address>Div</address> + <forwarding-address>Div.2</forwarding-address> + <filter string="div=2"/> + <exclusive>BOOLEAN_VALUE</exclusive> + </divert> + <divert name="Test.NonExisting"> + <address>Div</address> + <forwarding-address>Div.IDontExist</forwarding-address> + <exclusive>BOOLEAN_VALUE</exclusive> + </divert> + </diverts> \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
