This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 30c5e63e0b61d8a5bd1dcd2ade06bc7c96bb5fdb
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 25 10:13:47 2024 -0400

    ARTEMIS-5066 Disable Divert on Mirror Target
---
 .../connect/mirror/AMQPMirrorControllerTarget.java |   2 +-
 .../artemis/core/postoffice/impl/BindingsImpl.java |   8 +-
 .../artemis/core/server/RoutingContext.java        |   4 +
 .../artemis/core/server/impl/DivertImpl.java       |  11 +-
 .../core/server/impl/RoutingContextImpl.java       |  12 +
 .../mirror/DivertSoakMirrorTest.java               | 392 +++++++++++++++++++++
 .../mirror/ReplicatedBothNodesMirrorTest.java      |  19 +-
 .../test/resources/DivertSoakMirrorTest-divert.txt |  25 ++
 8 files changed, 462 insertions(+), 11 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 3517e0cdbd..27177f6ab3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -547,7 +547,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
       routingContext.setTransaction(transaction);
       duplicateIDCache.addToCache(duplicateIDBytes, transaction);
 
-      
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY);
+      
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY).disableDivert();
       if (targetQueues != null) {
          targetQueuesRouting(message, routingContext, targetQueues);
          server.getPostOffice().processRoute(message, routingContext, false);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index ec309f8a0f..95935e9959 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -368,7 +368,9 @@ public final class BindingsImpl implements Bindings {
          }
          final Filter filter = binding.getFilter();
          if (filter == null || filter.match(message)) {
-            binding.getBindable().route(message, context);
+            if (!(binding instanceof DivertBinding && 
context.isDivertDisabled())) {
+               binding.getBindable().route(message, context);
+            }
             routed = true;
          }
       }
@@ -415,7 +417,9 @@ public final class BindingsImpl implements Bindings {
          }
 
          if (nextBinding != null) {
-            nextBinding.route(message, context);
+            if (!(context.isDivertDisabled() && nextBinding instanceof 
DivertBinding)) {
+               nextBinding.route(message, context);
+            }
          }
       });
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 73d7ebf381..000ede4d37 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -41,6 +41,10 @@ public interface RoutingContext {
 
    MirrorOption getMirrorOption();
 
+   boolean isDivertDisabled();
+
+   void disableDivert();
+
    void forEachDurable(Consumer<Queue> consumer);
 
    RoutingContext setMirrorOption(MirrorOption option);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index a279e2d123..7132bb6545 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -92,8 +92,10 @@ public class DivertImpl implements Divert {
 
    @Override
    public void route(final Message message, final RoutingContext context) 
throws Exception {
-      // We must make a copy of the message, otherwise things like returning 
credits to the page won't work
-      // properly on ack, since the original address will be overwritten
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("Routing message {} through context {}", message, 
context);
+      }
 
       for (SimpleString forwardAddress : forwardAddresses) {
          if (logger.isTraceEnabled()) {
@@ -107,8 +109,13 @@ public class DivertImpl implements Divert {
          // Shouldn't copy if it's not routed anywhere else
          if (!forwardAddress.equals(context.getAddress(message))) {
             long id = storageManager.generateID();
+
+            // We must make a copy of the message, otherwise things like 
returning credits to the page won't work
+            // properly on ack, since the original address will be overwritten
             copy = message.copy(id);
 
+            logger.trace("Divert {} copied message {}", uniqueName, copy);
+
             // This will set the original MessageId, and the original address
             copy.referenceOriginalMessage(message, this.getUniqueName());
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index d4bcfcebe5..103057498f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -61,6 +61,8 @@ public class RoutingContextImpl implements RoutingContext {
 
    Boolean internalOnly = null;
 
+   boolean divertDisabled = false;
+
    volatile int version;
 
    MirrorOption mirrorOption = MirrorOption.enabled;
@@ -89,6 +91,16 @@ public class RoutingContextImpl implements RoutingContext {
       return mirrorOption;
    }
 
+   @Override
+   public boolean isDivertDisabled() {
+      return divertDisabled;
+   }
+
+   @Override
+   public void disableDivert() {
+      divertDisabled = true;
+   }
+
    @Override
    public boolean isMirrorDisabled() {
       return mirrorOption == MirrorOption.disabled;
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/DivertSoakMirrorTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/DivertSoakMirrorTest.java
new file mode 100644
index 0000000000..e9a6f18569
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/DivertSoakMirrorTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class DivertSoakMirrorTest extends SoakTestBase {
+
+   private static final String TEST_NAME = "DIVERT_MIRROR";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+   private static String DIVERT_CONFIGURATION_FILE_LOCATION = 
"DivertSoakMirrorTest-divert.txt";
+
+   // Set this to true and log4j will be configured with some relevant 
log.trace for the AckManager at the server's
+   private static final boolean TRACE_LOGS = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", 
"trace"));
+   private static final boolean REUSE_SERVERS = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS", 
"false"));
+
+   /*
+    * Time each consumer takes to process a message received to allow some 
messages accumulating.
+    * This sleep happens right before the commit.
+    */
+   private static final String QUEUE_NAME_LIST = 
"queueTest,Div,Div.0,Div.1,Div.2";
+   private static final String QUEUE_NAME = "queueTest";
+
+   public static final String DC1_NODE = "DivertSoakMirrorTest/DC1";
+   public static final String DC2_NODE = "DivertSoakMirrorTest/DC2";
+
+   volatile Process processDC1;
+   volatile Process processDC2;
+
+   @AfterEach
+   public void destroyServers() throws Exception {
+      if (processDC1 != null) {
+         processDC1.destroyForcibly();
+         processDC1.waitFor(1, TimeUnit.MINUTES);
+         processDC1 = null;
+      }
+      if (processDC2 != null) {
+         processDC2.destroyForcibly();
+         processDC2.waitFor(1, TimeUnit.MINUTES);
+         processDC2 = null;
+      }
+   }
+
+   private static final String DC1_IP = "localhost:61616";
+   private static final String DC2_IP = "localhost:61618";
+
+   private static String uri(String ip) {
+      return "tcp://" + ip;
+   }
+
+   private void startServers() throws Exception {
+      processDC2 = startServer(DC2_NODE, -1, -1, new 
File(getServerLocation(DC2_NODE), "broker.properties"));
+      processDC1 = startServer(DC1_NODE, -1, -1, new 
File(getServerLocation(DC1_NODE), "broker.properties"));
+      ServerUtil.waitForServerToStart(2, 10_000);
+      ServerUtil.waitForServerToStart(0, 10_000);
+
+      // Waiting both nodes to be connected
+      //Wait.assertTrue(() -> FileUtil.find(new 
File(getFileServerLocation(DC1_NODE), "log/artemis.log"), l -> 
l.contains("AMQ111003")), 5000, 100);
+      //Wait.assertTrue(() -> FileUtil.find(new 
File(getFileServerLocation(DC2_NODE), "log/artemis.log"), l -> 
l.contains("AMQ111003")), 5000, 100);
+   }
+
+   private static void createMirroredServer(boolean paging,
+                                            String serverName,
+                                            String connectionName,
+                                            String mirrorURI,
+                                            int portOffset,
+                                            boolean exclusiveDivert) throws 
Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      if (REUSE_SERVERS && serverLocation.exists()) {
+         deleteDirectory(new File(serverLocation, "data"));
+         return;
+      }
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", 
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", 
DC1_NODE);
+      cliCreateServer.addArgs("--queues", QUEUE_NAME_LIST);
+      cliCreateServer.setPortOffset(portOffset);
+      cliCreateServer.setClustered(false);
+
+      cliCreateServer.createServer();
+
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("messageExpiryScanPeriod", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".uri", 
mirrorURI);
+      brokerProperties.put("AMQPConnections." + connectionName + 
".retryInterval", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
+      brokerProperties.put("AMQPConnections." + connectionName + 
".connectionElements.mirror.sync", "false");
+      brokerProperties.put("largeMessageSync", "false");
+
+      if (paging) {
+         brokerProperties.put("addressSettings.#.maxSizeMessages", "50");
+         brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+         brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+         brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+         brokerProperties.put("mirrorPageTransaction", "true");
+      }
+
+      File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+
+      File brokerXml = new File(serverLocation, "/etc/broker.xml");
+
+      assertTrue(brokerXml.exists());
+      assertTrue(FileUtil.findReplace(brokerXml, 
"<page-size-bytes>10M</page-size-bytes>", 
"<page-size-bytes>100K</page-size-bytes>"));
+      assertTrue(FileUtil.findReplace(brokerXml, 
"amqpDuplicateDetection=true;", 
"amqpDuplicateDetection=true;ackManagerFlushTimeout=" + 
TimeUnit.MINUTES.toMillis(10) + ";"));
+
+      installDivert(brokerXml, exclusiveDivert);
+
+      if (TRACE_LOGS) {
+         replaceLogs(serverLocation);
+      }
+
+   }
+
+   private static void installDivert(File brokerXml, boolean exclusive) throws 
Exception {
+      String divertConfig = 
FileUtil.readFile(DivertSoakMirrorTest.class.getClassLoader().getResourceAsStream(DIVERT_CONFIGURATION_FILE_LOCATION));
+      assertNotNull(divertConfig);
+      divertConfig = divertConfig.replaceAll("BOOLEAN_VALUE", 
String.valueOf(exclusive));
+      assertTrue(FileUtil.findReplace(brokerXml, "</acceptors>", 
"</acceptors>\n" + divertConfig));
+   }
+
+   private static void replaceLogs(File serverLocation) throws Exception {
+      File log4j = new File(serverLocation, "/etc/log4j2.properties");
+      assertTrue(FileUtil.findReplace(log4j, 
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + 
"logger.divert.name=org.apache.activemq.artemis.core.server.impl.DivertImpl\n" 
+ "logger.divert.level=TRACE\n" + 
"logger.target.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n"
 + "logger.target.level=TRACE\n" + "appender.console.filter.threshold.type = 
ThresholdFilter\n" + "appender.console.filter.threshold.level = INFO"));
+   }
+
+   private static void replaceLogsOnServer2(File serverLocation) throws 
Exception {
+      File log4j = new File(serverLocation, "/etc/log4j2.properties");
+      assertTrue(FileUtil.findReplace(log4j, 
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + 
"logger.divert.name=org.apache.activemq.artemis.core.server.impl.DivertImpl\n" 
+ "logger.divert.level=TRACE\n" + 
"logger.target.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource\n"
 + "logger.target.level=DEBUG\n" + "appender.console.filter.threshold.type = 
ThresholdFilter\n" + "appender.console.filter.threshold.level = TRACE"));
+   }
+
+   public static void createRealServers(boolean paging, boolean divert, 
boolean exclusiveDivert) throws Exception {
+      createMirroredServer(paging, DC1_NODE, "mirror", uri(DC2_IP), 0, 
exclusiveDivert);
+      createMirroredServer(paging, DC2_NODE, "mirror", uri(DC1_IP), 2, 
exclusiveDivert);
+   }
+
+   @Test
+   public void testDivertsExclusive() throws Exception {
+      testDiverts(true, false);
+   }
+
+   @Test
+   public void testDivertsExclusiveNoConsume() throws Exception {
+      testDiverts(true, false, false);
+   }
+
+   @Test
+   public void testDivertsNonExclusive() throws Exception {
+      testDiverts(false, false);
+   }
+
+   @Test
+   public void testDivertsExclusiveSwitchOver() throws Exception {
+      testDiverts(true, true);
+   }
+
+   @Test
+   public void testDivertsNonExclusiveSwitchOver() throws Exception {
+      testDiverts(false, true);
+   }
+
+   private void testDiverts(boolean exclusive, boolean switchOver) throws 
Exception {
+      testDiverts(exclusive, switchOver, true);
+   }
+
+   private void testDiverts(boolean exclusive, boolean switchOver, boolean 
consume) throws Exception {
+      String protocol = "OPENWIRE";
+      createRealServers(false, true, exclusive);
+
+      SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, 
null);
+      SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, 
null);
+
+      //replaceLogsOnServer2(getFileServerLocation(DC2_NODE));
+      startServers();
+
+      ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory(protocol, uri(DC1_IP));
+      ConnectionFactory connectionFactoryConsume;
+
+      if (switchOver) {
+         logger.info("Switching over consumption to DC2");
+         connectionFactoryConsume = CFUtil.createConnectionFactory(protocol, 
uri(DC2_IP));
+      } else {
+         connectionFactoryConsume = CFUtil.createConnectionFactory(protocol, 
uri(DC1_IP));
+      }
+
+      int numberOfDiverts = 3;
+      long messagesPerDivert = 100;
+      long messages = numberOfDiverts * messagesPerDivert;
+
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+         MessageProducer producer = 
session.createProducer(session.createQueue("Div"));
+
+         for (int i = 0; i < messages; i++) {
+            TextMessage message = session.createTextMessage("i = " + i);
+            message.setIntProperty("div", (i % numberOfDiverts));
+            message.setIntProperty("i", i);
+            producer.setPriority(RandomUtil.randomInterval(0, 9));
+            producer.send(message);
+         }
+         session.commit();
+      }
+
+      Wait.assertEquals(0L, () -> getMessageCount(managementDC1, SNF_QUEUE), 
5000, 100);
+      Wait.assertEquals(0L, () -> getMessageCount(managementDC2, SNF_QUEUE), 
5000, 100);
+
+      Wait.assertEquals(0L, () -> getMessageCount(managementDC1, QUEUE_NAME), 
5000, 100);
+      Wait.assertEquals(exclusive ? 0L : messagesPerDivert * numberOfDiverts, 
() -> getMessageCount(managementDC1, "Div"), 5000, 100);
+      Wait.assertEquals(messagesPerDivert, () -> 
getMessageCount(managementDC1, "Div.0"), 5000, 100);
+      Wait.assertEquals(messagesPerDivert, () -> 
getMessageCount(managementDC1, "Div.1"), 5000, 100);
+      Wait.assertEquals(messagesPerDivert, () -> 
getMessageCount(managementDC1, "Div.2"), 5000, 100);
+
+      Wait.assertEquals(0L, () -> getMessageCount(managementDC2, QUEUE_NAME), 
5000, 100);
+      Wait.assertEquals(exclusive ? 0L : messagesPerDivert * numberOfDiverts, 
() -> getMessageCount(managementDC2, "Div"), 5000, 100);
+      Wait.assertEquals(messagesPerDivert, () -> 
getMessageCount(managementDC2, "Div.0"), 5000, 100);
+      Wait.assertEquals(messagesPerDivert, () -> 
getMessageCount(managementDC2, "Div.1"), 5000, 100);
+      Wait.assertEquals(messagesPerDivert, () -> 
getMessageCount(managementDC2, "Div.2"), 5000, 100);
+
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE));
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE));
+
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE));
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE));
+
+      if (consume) {
+         try (Connection connection = 
connectionFactoryConsume.createConnection()) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+            connection.start();
+
+            // the main divert.. as the diverts are non exclusive
+            try (MessageConsumer consumer = 
session.createConsumer(session.createQueue("Div"))) {
+               if (!exclusive) {
+                  for (int i = 0; i < messagesPerDivert * numberOfDiverts; 
i++) {
+                     TextMessage message = (TextMessage) 
consumer.receive(5000);
+                     assertNotNull(message);
+                     if (i % 100 == 0) {
+                        logger.info("processed {}", i);
+                     }
+                     if (i == 0) {
+                        checkProperties(connection, message);
+                     }
+                     session.commit();
+                  }
+               }
+               assertNull(consumer.receiveNoWait());
+            }
+            session.commit();
+         }
+
+         for (int div = 0; div <= 2; div++) {
+            for (int i = 0; i < messagesPerDivert; i++) {
+               try (Connection connection = 
connectionFactoryConsume.createConnection()) {
+                  try (Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED)) {
+                     try (MessageConsumer consumer = 
session.createConsumer(session.createQueue("Div." + div))) {
+                        connection.start();
+                        TextMessage message = (TextMessage) 
consumer.receive(5000);
+                        if (i % 10 == 0) {
+                           logger.info("i={} on div {}", i, div);
+                        }
+                        MessageProducer testProducer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+                        testProducer.send(message);
+                        assertNotNull(message);
+                        session.commit();
+                     }
+                  }
+               }
+            }
+
+            try (Connection connection = 
connectionFactoryConsume.createConnection()) {
+               try (Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED)) {
+                  try (MessageConsumer consumer = 
session.createConsumer(session.createQueue("Div." + div))) {
+                     assertNull(consumer.receiveNoWait(), "div " + div + " 
received a message");
+                  }
+                  session.commit();
+               }
+            }
+         }
+
+         try (Connection connection = 
connectionFactoryConsume.createConnection()) {
+            try (Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED)) {
+               try (MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME))) {
+                  connection.start();
+                  for (int i = 0; i < messagesPerDivert * numberOfDiverts; 
i++) {
+                     TextMessage message = (TextMessage) 
consumer.receive(5000);
+                     assertNotNull(message);
+                     if (i == 0) {
+                        checkProperties(connection, message);
+                     }
+                     session.commit();
+                  }
+                  assertNull(consumer.receiveNoWait());
+                  session.commit();
+               }
+            }
+         }
+
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC1, 
SNF_QUEUE), 30_000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC2, 
SNF_QUEUE), 30_000, 100);
+
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC1, 
QUEUE_NAME), 5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div"), 
5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div.0"), 
5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div.1"), 
5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC1, "Div.2"), 
5000, 100);
+
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC2, 
QUEUE_NAME), 5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div"), 
5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div.0"), 
5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div.1"), 
5000, 100);
+         Wait.assertEquals(0L, () -> getMessageCount(managementDC2, "Div.2"), 
5000, 100);
+      }
+
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE));
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE));
+   }
+
+   private void checkProperties(Connection connection, javax.jms.Message 
message) throws Exception {
+      try (Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+         TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+         MessageProducer producer = session.createProducer(temporaryQueue);
+         producer.send(message);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(temporaryQueue);
+         javax.jms.Message receivedMessage = consumer.receive(5000);
+         assertNotNull(receivedMessage);
+
+         // The cleanup for x-opt happens on server's side.
+         // we may receive if coming directly from a mirrored queue,
+         // however we should cleanup on the next send to avoid invalid IDs on 
the server
+         Enumeration propertyNames = receivedMessage.getPropertyNames();
+         while (propertyNames.hasMoreElements()) {
+            String property = String.valueOf(propertyNames.nextElement());
+            assertFalse(property.startsWith("x-opt"));
+         }
+      }
+   }
+
+}
\ No newline at end of file
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java
index 147b68251e..dd0ad73348 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java
@@ -67,6 +67,9 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+
+   private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
    // Set this to true and log4j will be configured with some relevant 
log.trace for the AckManager at the server's
    private static final boolean TRACE_LOGS = 
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", 
"false"));
    private static final int NUMBER_MESSAGES = 
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 200);
@@ -79,6 +82,7 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
     * Time each consumer takes to process a message received to allow some 
messages accumulating.
     * This sleep happens right before the commit.
     */
+   private static final String QUEUE_NAME_LIST = 
"queueTest,Div,Div.0,Div.1,Div.2";
    private static final String QUEUE_NAME = "queueTest";
 
    private static String body;
@@ -180,7 +184,6 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
       cliCreateServer.createServer();
 
       Properties brokerProperties = new Properties();
-      brokerProperties.put("messageExpiryScanPeriod", "1000");
       brokerProperties.put("AMQPConnections." + connectionName + ".uri", 
mirrorURI);
       brokerProperties.put("AMQPConnections." + connectionName + 
".retryInterval", "1000");
       brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
@@ -270,7 +273,6 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
       saveProperties(brokerProperties, brokerPropertiesFile);
 
-
       File brokerXml = new File(serverLocation, "/etc/broker.xml");
       assertTrue(brokerXml.exists());
       assertTrue(FileUtil.findReplace(brokerXml, 
"amqpDuplicateDetection=true;", 
"amqpDuplicateDetection=true;ackManagerFlushTimeout=" + 
TimeUnit.MINUTES.toMillis(10) + ";"));
@@ -376,7 +378,6 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
       final int failbackAt = 700;
       final int lastKillAt = 1200;
       final int totalMessages = 1_800;
-      String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
 
       try (Connection connection = connectionFactoryDC1A.createConnection()) {
          connection.start();
@@ -431,7 +432,7 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
          session.commit();
       }
 
-      Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue));
+      Wait.assertEquals(0, () -> getMessageCount(managementDC1, SNF_QUEUE));
       Wait.assertEquals(oddSend, () -> getMessageCount(managementDC1, 
QUEUE_NAME));
       Wait.assertEquals(oddSend, () -> getMessageCount(managementDC2Backup, 
QUEUE_NAME));
 
@@ -451,10 +452,16 @@ public class ReplicatedBothNodesMirrorTest extends 
SoakTestBase {
          session.commit();
       }
 
-      Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue));
-      Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, 
snfQueue));
+      Wait.assertEquals(0, () -> getMessageCount(managementDC1, SNF_QUEUE));
+      Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, 
SNF_QUEUE));
       Wait.assertEquals(0, () -> getMessageCount(managementDC1, QUEUE_NAME));
       Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, 
QUEUE_NAME));
+
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE));
+      
LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_REPLICA_NODE));
+      LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE));
+      
LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_REPLICA_NODE));
+
    }
 
    @Test
diff --git 
a/tests/soak-tests/src/test/resources/DivertSoakMirrorTest-divert.txt 
b/tests/soak-tests/src/test/resources/DivertSoakMirrorTest-divert.txt
new file mode 100644
index 0000000000..d1c042403f
--- /dev/null
+++ b/tests/soak-tests/src/test/resources/DivertSoakMirrorTest-divert.txt
@@ -0,0 +1,25 @@
+      <diverts>
+         <divert name="Test.Div0">
+            <address>Div</address>
+            <forwarding-address>Div.0</forwarding-address>
+            <filter string="div=0"/>
+            <exclusive>BOOLEAN_VALUE</exclusive>
+         </divert>
+         <divert name="Test.Div1">
+            <address>Div</address>
+            <forwarding-address>Div.1</forwarding-address>
+            <filter string="div=1"/>
+            <exclusive>BOOLEAN_VALUE</exclusive>
+         </divert>
+         <divert name="Test.Div2">
+            <address>Div</address>
+            <forwarding-address>Div.2</forwarding-address>
+            <filter string="div=2"/>
+            <exclusive>BOOLEAN_VALUE</exclusive>
+         </divert>
+         <divert name="Test.NonExisting">
+            <address>Div</address>
+            <forwarding-address>Div.IDontExist</forwarding-address>
+            <exclusive>BOOLEAN_VALUE</exclusive>
+         </divert>
+      </diverts>
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to