This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c037386b6 ARTEMIS-4206 Unreferenced AMQP Large Messages not removed 
right away, requiring a reboot
2c037386b6 is described below

commit 2c037386b60a6a83d1d8548fea750525b3488567
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 14 13:15:07 2023 -0400

    ARTEMIS-4206 Unreferenced AMQP Large Messages not removed right away, 
requiring a reboot
---
 .../journal/AbstractJournalStorageManager.java     |   4 +
 .../integration/amqp/AmqpLargeMessageTest.java     |  88 +++++
 .../client/LargeMessageAvoidLargeMessagesTest.java |   8 +
 .../client/LargeMessageCompressTest.java           |   7 +
 .../tests/integration/client/LargeMessageTest.java |  46 +++
 tests/soak-tests/pom.xml                           |  60 ++++
 .../resources/servers/dual-federation1/broker.xml  | 281 ++++++++++++++++
 .../resources/servers/dual-federation2/broker.xml  | 280 ++++++++++++++++
 .../main/resources/servers/lmbroker1/broker.xml    | 281 ++++++++++++++++
 .../resources/servers/lmbroker1/management.xml     |  52 +++
 .../main/resources/servers/lmbroker2/broker.xml    | 283 ++++++++++++++++
 .../resources/servers/lmbroker2/management.xml     |  52 +++
 .../ClusteredLargeMessageInterruptTest.java        | 361 +++++++++++++++++++++
 13 files changed, 1803 insertions(+)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 91bb8131e7..7b86529a4b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1015,6 +1015,10 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                         storedLargeMessages.remove(message.getMessageID());
                      }
 
+                     if (message.isLargeMessage()) {
+                        largeMessages.add((LargeServerMessage) message);
+                     }
+
                      messages.put(record.id, message);
 
                      break;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 143576f38a..94bb09c7fa 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -46,6 +46,7 @@ import 
org.apache.activemq.artemis.core.message.LargeBodyReader;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -994,6 +995,93 @@ public class AmqpLargeMessageTest extends 
AmqpClientTestSupport {
       }
    }
 
+
+   @Test
+   public void testDeleteUnreferencedMessage() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+         AmqpMessage message = createAmqpMessage((byte)'A', payload);
+         message.setDurable(true);
+         sender.send(message);
+         sender.close();
+      } finally {
+         connection.close();
+      }
+
+      final org.apache.activemq.artemis.core.server.Queue queue = 
server.locateQueue(getTestName());
+      queue.forEach(ref -> {
+         if (ref.getMessage().isLargeMessage()) {
+            try {
+               // simulating an ACK but the server crashed before the delete 
of the record, and the large message file
+               server.getStorageManager().storeAcknowledge(queue.getID(), 
ref.getMessageID());
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      });
+
+      server.stop();
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+
+      server.start();
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));
+
+      validateNoFilesOnLargeDir();
+      runAfter(server::stop);
+   }
+
+   @Test
+   public void testSimpleLargeMessageRestart() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getTestName());
+         AmqpMessage message = createAmqpMessage((byte)'A', payload);
+         message.setDurable(true);
+         sender.send(message);
+         sender.close();
+      } finally {
+         connection.close();
+      }
+
+      server.stop();
+
+      AssertionLoggerHandler.startCapture();
+      server.start();
+
+      // These two should not happen as the consumer will receive them
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221019")); // 
unferenced record
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221018")); // 
unferenced large message
+
+      connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpReceiver receiver = session.createReceiver(getTestName());
+         receiver.flow(1);
+         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(message);
+         message.accept();
+         receiver.close();
+         session.close();
+      } finally {
+         connection.close();
+      }
+
+      validateNoFilesOnLargeDir();
+      runAfter(server::stop);
+   }
+
+
    private void sendObjectMessages(int nMsgs, ConnectionFactory factory) 
throws Exception {
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
index 7c3bc3361f..ed9efdce5b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 /**
@@ -58,6 +59,13 @@ public class LargeMessageAvoidLargeMessagesTest extends 
LargeMessageTest {
       return 
super.createFactory(isNetty).setMinLargeMessageSize(10240).setCompressLargeMessage(true);
    }
 
+   @Override
+   @Test
+   public void testDeleteUnreferencedMessage() {
+      // this test makes no sense as it needs to delete a large message and 
its record
+      Assume.assumeFalse(true);
+   }
+
    @Test
    public void testSimpleSendOnAvoid() throws Exception {
       ActiveMQServer server = createServer(true, isNetty());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index a4e1a722cb..edc83e3441 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -86,6 +86,13 @@ public class LargeMessageCompressTest extends 
LargeMessageTest {
       return super.createFactory(isNetty).setCompressLargeMessage(true);
    }
 
+   @Override
+   @Test
+   public void testDeleteUnreferencedMessage() {
+      // this test makes no sense as it needs to delete a large message and 
its record
+      Assume.assumeFalse(true);
+   }
+
    @Test
    public void testLargeMessageCompressionNotCompressedAndBrowsed() throws 
Exception {
       final int messageSize = (int) (3.5 * 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 9ef85d999d..17e1d88c91 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.ServerProducer;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import 
org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -375,6 +376,51 @@ public class LargeMessageTest extends LargeMessageTestBase 
{
       validateNoFilesOnLargeDir();
    }
 
+   @Test
+   public void testDeleteUnreferencedMessage() throws Exception {
+      final int messageSize = (int) (3.5 * 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      server.createQueue(new 
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+      ClientSessionFactory sf = 
addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = addClientSession(sf.createSession(false, true, 
false));
+      ClientProducer producer = session.createProducer(getName());
+
+      Message clientFile = createLargeClientMessageStreaming(session, 
messageSize, true);
+
+      producer.send(clientFile);
+
+      session.close();
+
+      final Queue queue = server.locateQueue(getName());
+      queue.forEach(ref -> {
+         // simulating an ACK but the server crashed before the delete of the 
record, and the large message file
+         if (ref.getMessage().isLargeMessage()) {
+            try {
+               server.getStorageManager().storeAcknowledge(queue.getID(), 
ref.getMessageID());
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      });
+      server.stop();
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+
+      server.start();
+      Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));
+
+      validateNoFilesOnLargeDir();
+      runAfter(server::stop);
+
+   }
+
 
    @Test
    public void testPendingRecord() throws Exception {
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index 7626a743f9..3bc6398e53 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -273,6 +273,66 @@
                   </configuration>
                </execution>
 
+               <!-- Used on ClusteredLargeMessageInterruptTest -->
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-lmbroker1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>artemis</user>
+                     <password>artemis</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>false</noWeb>
+                     <instance>${basedir}/target/lmbroker1</instance>
+                     
<configuration>${basedir}/target/classes/servers/lmbroker1</configuration>
+                     <args>
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                        <arg>--clustered</arg>
+                        <arg>--staticCluster</arg>
+                        <arg>tcp://localhost:61716</arg>
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                        <arg>--queues</arg>
+                        <arg>ClusteredLargeMessageInterruptTest</arg>
+                        <arg>--name</arg>
+                        <arg>lmbroker1</arg>
+                     </args>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-lmbroker2</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>artemis</user>
+                     <password>artemis</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>false</noWeb>
+                     <instance>${basedir}/target/lmbroker2</instance>
+                     
<configuration>${basedir}/target/classes/servers/lmbroker2</configuration>
+                     <portOffset>100</portOffset>
+                     <args>
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                        <arg>--clustered</arg>
+                        <arg>--staticCluster</arg>
+                        <arg>tcp://localhost:61616</arg>
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                        <arg>--queues</arg>
+                        <arg>ClusteredLargeMessageInterruptTest</arg>
+                        <arg>--name</arg>
+                        <arg>lmbroker2</arg>
+                     </args>
+                  </configuration>
+               </execution>
             </executions>
          </plugin>
 
diff --git 
a/tests/soak-tests/src/main/resources/servers/dual-federation1/broker.xml 
b/tests/soak-tests/src/main/resources/servers/dual-federation1/broker.xml
new file mode 100644
index 0000000000..ba0293be91
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/dual-federation1/broker.xml
@@ -0,0 +1,281 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xmlns:xi="http://www.w3.org/2001/XInclude";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>dual-federation1</name>
+
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      
+      <!-- if you want to retain your journal uncomment this following 
configuration.
+
+      This will allow your system to keep 7 days of your data, up to 10G. 
Tweak it accordingly to your use case and capacity.
+
+      it is recommended to use a separate storage unit from the journal for 
performance considerations.
+
+      <journal-retention-directory period="7" unit="DAYS" 
storage-limit="10G">data/retention</journal-retention-directory>
+
+      You can also enable retention by using the argument journal-retention on 
the `artemis create` command -->
+
+
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+            <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+      
+
+      <!-- the system will enter into page mode once you hit this limit. This 
is an estimate in bytes of how much the messages are using in memory
+
+      The system will use half of the available memory (-Xmx) by default for 
the global-max-size.
+      You may specify a different value here if you need to customize it to 
your needs.
+
+      <global-max-size>100Mb</global-max-size> -->
+
+      <!-- the maximum number of messages accepted before entering full 
address mode.
+           if global-max-size is specified the full address mode will be 
specified by whatever hits it first. -->
+      <global-max-messages>-1</global-max-messages>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+         <!-- amqpDuplicateDetection: If you are not using duplicate 
detection, set this to false
+                                      as duplicate detection requires 
applicationProperties to be parsed on the server. -->
+         <!-- amqpMinLargeMessageSize: Determines how many bytes are 
considered large, so we start using files to hold their data.
+                                       default: 102400, -1 would mean to 
disable large mesasge control -->
+
+         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or 
Artemis 1.x clients add
+                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to 
the acceptor url.
+                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for 
more information. -->
+
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
+         <acceptor 
name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+         <!-- STOMP Acceptor. -->
+         <acceptor 
name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP 
for legacy HornetQ clients. -->
+         <acceptor 
name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+         <!-- MQTT Acceptor -->
+         <acceptor 
name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+      </acceptors>
+
+      <connectors>
+         <connector 
name="otherside">tcp://localhost:61716?ackBatchSize=1;consumerWindowSize=-1</connector>
+      </connectors>
+
+      <federations>
+
+         <federation name="federation-1" user="artemis" password="artemis">
+            <upstream name="upstream-1">
+               <static-connectors>
+                  <connector-ref>otherside</connector-ref>
+               </static-connectors>
+               <policy ref="policy-1"/>
+            </upstream>
+
+            <queue-policy name="policy-1">
+               <include queue-match="#" address-match="test.#" />
+            </queue-policy>
+
+         </federation>
+
+      </federations>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+
+            <!-- if max-size-bytes and max-size-messages were both enabled, 
the system will enter into paging
+                 based on the first attribute to hits the maximum value -->
+            <!-- limit for the address in bytes, -1 means unlimited -->
+            <max-size-bytes>-1</max-size-bytes>
+
+            <!-- limit for the address in messages, -1 means unlimited -->
+            <max-size-messages>-1</max-size-messages>
+
+            <!-- the size of each file on paging. Notice we keep files in 
memory while they are in use.
+                 Lower this setting if you have too many queues in memory. -->
+            <page-size-bytes>10M</page-size-bytes>
+
+            <!-- limit how many messages are read from paging into the Queue. 
-->
+            <max-read-page-messages>-1</max-read-page-messages>
+
+            <!-- limit how much memory is read from paging into the Queue. -->
+            <max-read-page-bytes>20M</max-read-page-bytes>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-delete-queues>false</auto-delete-queues>
+            <auto-delete-addresses>false</auto-delete-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="test.queue">
+            <anycast>
+               <queue name="test.queue" />
+            </anycast>
+         </address>
+
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard 
LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git 
a/tests/soak-tests/src/main/resources/servers/dual-federation2/broker.xml 
b/tests/soak-tests/src/main/resources/servers/dual-federation2/broker.xml
new file mode 100644
index 0000000000..35da6656cb
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/dual-federation2/broker.xml
@@ -0,0 +1,280 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xmlns:xi="http://www.w3.org/2001/XInclude";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>dual-federation1</name>
+
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      
+      <!-- if you want to retain your journal uncomment this following 
configuration.
+
+      This will allow your system to keep 7 days of your data, up to 10G. 
Tweak it accordingly to your use case and capacity.
+
+      it is recommended to use a separate storage unit from the journal for 
performance considerations.
+
+      <journal-retention-directory period="7" unit="DAYS" 
storage-limit="10G">data/retention</journal-retention-directory>
+
+      You can also enable retention by using the argument journal-retention on 
the `artemis create` command -->
+
+
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+            <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+      
+
+      <!-- the system will enter into page mode once you hit this limit. This 
is an estimate in bytes of how much the messages are using in memory
+
+      The system will use half of the available memory (-Xmx) by default for 
the global-max-size.
+      You may specify a different value here if you need to customize it to 
your needs.
+
+      <global-max-size>100Mb</global-max-size> -->
+
+      <!-- the maximum number of messages accepted before entering full 
address mode.
+           if global-max-size is specified the full address mode will be 
specified by whatever hits it first. -->
+      <global-max-messages>-1</global-max-messages>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+         <!-- amqpDuplicateDetection: If you are not using duplicate 
detection, set this to false
+                                      as duplicate detection requires 
applicationProperties to be parsed on the server. -->
+         <!-- amqpMinLargeMessageSize: Determines how many bytes are 
considered large, so we start using files to hold their data.
+                                       default: 102400, -1 would mean to 
disable large mesasge control -->
+
+         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or 
Artemis 1.x clients add
+                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to 
the acceptor url.
+                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for 
more information. -->
+
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61716?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
+         <acceptor 
name="amqp">tcp://0.0.0.0:5772?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+         <!-- STOMP Acceptor. -->
+         <acceptor 
name="stomp">tcp://0.0.0.0:61713?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP 
for legacy HornetQ clients. -->
+         <acceptor 
name="hornetq">tcp://0.0.0.0:5545?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+         <!-- MQTT Acceptor -->
+         <acceptor 
name="mqtt">tcp://0.0.0.0:1983?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+      </acceptors>
+
+      <connectors>
+         <connector 
name="otherside">tcp://localhost:61616?ackBatchSize=1;consumerWindowSize=-1</connector>
+      </connectors>
+
+      <federations>
+
+         <federation name="federation-1" user="artemis" password="artemis">
+            <upstream name="upstream-1">
+               <static-connectors>
+                  <connector-ref>otherside</connector-ref>
+               </static-connectors>
+               <policy ref="policy-1"/>
+            </upstream>
+
+            <queue-policy name="policy-1">
+               <include queue-match="#" address-match="test.#" />
+            </queue-policy>
+
+         </federation>
+
+      </federations>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+
+            <!-- if max-size-bytes and max-size-messages were both enabled, 
the system will enter into paging
+                 based on the first attribute to hits the maximum value -->
+            <!-- limit for the address in bytes, -1 means unlimited -->
+            <max-size-bytes>-1</max-size-bytes>
+
+            <!-- limit for the address in messages, -1 means unlimited -->
+            <max-size-messages>-1</max-size-messages>
+
+            <!-- the size of each file on paging. Notice we keep files in 
memory while they are in use.
+                 Lower this setting if you have too many queues in memory. -->
+            <page-size-bytes>10M</page-size-bytes>
+
+            <!-- limit how many messages are read from paging into the Queue. 
-->
+            <max-read-page-messages>-1</max-read-page-messages>
+
+            <!-- limit how much memory is read from paging into the Queue. -->
+            <max-read-page-bytes>20M</max-read-page-bytes>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-delete-queues>false</auto-delete-queues>
+            <auto-delete-addresses>false</auto-delete-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="test.queue">
+            <anycast>
+               <queue name="test.queue" />
+            </anycast>
+         </address>
+
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard 
LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git a/tests/soak-tests/src/main/resources/servers/lmbroker1/broker.xml 
b/tests/soak-tests/src/main/resources/servers/lmbroker1/broker.xml
new file mode 100644
index 0000000000..e13498c62b
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker1/broker.xml
@@ -0,0 +1,281 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xmlns:xi="http://www.w3.org/2001/XInclude";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>lmbroker1</name>
+
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      
+      <!-- if you want to retain your journal uncomment this following 
configuration.
+
+      This will allow your system to keep 7 days of your data, up to 10G. 
Tweak it accordingly to your use case and capacity.
+
+      it is recommended to use a separate storage unit from the journal for 
performance considerations.
+
+      <journal-retention-directory period="7" unit="DAYS" 
storage-limit="10G">data/retention</journal-retention-directory>
+
+      You can also enable retention by using the argument journal-retention on 
the `artemis create` command -->
+
+
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+            <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+      <connectors>
+            <!-- Connector used to be announced through cluster connections 
and notifications -->
+            <connector name="artemis">tcp://localhost:61616</connector>
+            <connector name = "node0">tcp://localhost:61716</connector>
+      </connectors>
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+      
+
+      <!-- the system will enter into page mode once you hit this limit. This 
is an estimate in bytes of how much the messages are using in memory
+
+      The system will use half of the available memory (-Xmx) by default for 
the global-max-size.
+      You may specify a different value here if you need to customize it to 
your needs.
+
+      <global-max-size>100Mb</global-max-size> -->
+
+      <!-- the maximum number of messages accepted before entering full 
address mode.
+           if global-max-size is specified the full address mode will be 
specified by whatever hits it first. -->
+      <global-max-messages>-1</global-max-messages>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+         <!-- amqpDuplicateDetection: If you are not using duplicate 
detection, set this to false
+                                      as duplicate detection requires 
applicationProperties to be parsed on the server. -->
+         <!-- amqpMinLargeMessageSize: Determines how many bytes are 
considered large, so we start using files to hold their data.
+                                       default: 102400, -1 would mean to 
disable large mesasge control -->
+
+         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or 
Artemis 1.x clients add
+                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to 
the acceptor url.
+                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for 
more information. -->
+
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://localhost:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
+         <acceptor 
name="amqp">tcp://localhost:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+         <!-- STOMP Acceptor. -->
+         <acceptor 
name="stomp">tcp://localhost:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP 
for legacy HornetQ clients. -->
+         <acceptor 
name="hornetq">tcp://localhost:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+         <!-- MQTT Acceptor -->
+         <acceptor 
name="mqtt">tcp://localhost:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+      </acceptors>
+
+      <cluster-user>cluster-admin</cluster-user>
+
+      <cluster-password>password-admin</cluster-password>
+      <cluster-connections>
+         <cluster-connection name="my-cluster">
+            <connector-ref>artemis</connector-ref>
+            <retry-interval>100</retry-interval>
+            
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
+            <max-hops>1</max-hops>
+            <static-connectors>
+               <connector-ref>node0</connector-ref>
+
+            </static-connectors>
+         </cluster-connection>
+      </cluster-connections>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+
+            <!-- if max-size-bytes and max-size-messages were both enabled, 
the system will enter into paging
+                 based on the first attribute to hits the maximum value -->
+            <!-- limit for the address in bytes, -1 means unlimited -->
+            <max-size-bytes>-1</max-size-bytes>
+
+            <!-- limit for the address in messages, -1 means unlimited -->
+            <max-size-messages>-1</max-size-messages>
+
+            <!-- the size of each file on paging. Notice we keep files in 
memory while they are in use.
+                 Lower this setting if you have too many queues in memory. -->
+            <page-size-bytes>10M</page-size-bytes>
+
+            <!-- limit how many messages are read from paging into the Queue. 
-->
+            <max-read-page-messages>-1</max-read-page-messages>
+
+            <!-- limit how much memory is read from paging into the Queue. -->
+            <max-read-page-bytes>20M</max-read-page-bytes>
+            <redistribution-delay>0</redistribution-delay>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-delete-queues>false</auto-delete-queues>
+            <auto-delete-addresses>false</auto-delete-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="ClusteredLargeMessageInterruptTest">
+            <anycast>
+               <queue name="ClusteredLargeMessageInterruptTest" />
+            </anycast>
+         </address>
+
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard 
LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git 
a/tests/soak-tests/src/main/resources/servers/lmbroker1/management.xml 
b/tests/soak-tests/src/main/resources/servers/lmbroker1/management.xml
new file mode 100644
index 0000000000..1d38e28ac9
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker1/management.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<management-context xmlns="http://activemq.apache.org/schema";>
+   <connector connector-port="1099"/>
+   <authorisation>
+      <allowlist>
+         <entry domain="hawtio"/>
+      </allowlist>
+      <default-access>
+         <access method="list*" roles="amq"/>
+         <access method="get*" roles="amq"/>
+         <access method="is*" roles="amq"/>
+         <access method="set*" roles="amq"/>
+         <access method="*" roles="amq"/>
+      </default-access>
+      <role-access>
+         <match domain="org.apache.activemq.artemis">
+            <access method="list*" roles="amq"/>
+            <access method="get*" roles="amq"/>
+            <access method="is*" roles="amq"/>
+            <access method="set*" roles="amq"/>
+            <!-- Note count and browse are need to access the browse tab in 
the console-->
+            <access method="browse*" roles="amq"/>
+            <access method="count*" roles="amq"/>
+            <access method="*" roles="amq"/>
+         </match>
+         <!--example of how to configure a specific object-->
+         <!--<match domain="org.apache.activemq.artemis" 
key="subcomponent=queues">
+            <access method="list*" roles="view,update,amq"/>
+            <access method="get*" roles="view,update,amq"/>
+            <access method="is*" roles="view,update,amq"/>
+            <access method="set*" roles="update,amq"/>
+            <access method="*" roles="amq"/>
+         </match>-->
+      </role-access>
+   </authorisation>
+</management-context>
\ No newline at end of file
diff --git a/tests/soak-tests/src/main/resources/servers/lmbroker2/broker.xml 
b/tests/soak-tests/src/main/resources/servers/lmbroker2/broker.xml
new file mode 100644
index 0000000000..b9c8793702
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker2/broker.xml
@@ -0,0 +1,283 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xmlns:xi="http://www.w3.org/2001/XInclude";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>lmbroker2</name>
+
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      
+      <!-- if you want to retain your journal uncomment this following 
configuration.
+
+      This will allow your system to keep 7 days of your data, up to 10G. 
Tweak it accordingly to your use case and capacity.
+
+      it is recommended to use a separate storage unit from the journal for 
performance considerations.
+
+      <journal-retention-directory period="7" unit="DAYS" 
storage-limit="10G">data/retention</journal-retention-directory>
+
+      You can also enable retention by using the argument journal-retention on 
the `artemis create` command -->
+
+
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+            <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+      <connectors>
+            <!-- Connector used to be announced through cluster connections 
and notifications -->
+            <connector name="artemis">tcp://localhost:61716</connector>
+            <connector name = "node0">tcp://localhost:61616</connector>
+      </connectors>
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+      
+
+      <!-- the system will enter into page mode once you hit this limit. This 
is an estimate in bytes of how much the messages are using in memory
+
+      The system will use half of the available memory (-Xmx) by default for 
the global-max-size.
+      You may specify a different value here if you need to customize it to 
your needs.
+
+      <global-max-size>100Mb</global-max-size> -->
+
+      <!-- the maximum number of messages accepted before entering full 
address mode.
+           if global-max-size is specified the full address mode will be 
specified by whatever hits it first. -->
+      <global-max-messages>-1</global-max-messages>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+         <!-- amqpDuplicateDetection: If you are not using duplicate 
detection, set this to false
+                                      as duplicate detection requires 
applicationProperties to be parsed on the server. -->
+         <!-- amqpMinLargeMessageSize: Determines how many bytes are 
considered large, so we start using files to hold their data.
+                                       default: 102400, -1 would mean to 
disable large mesasge control -->
+
+         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or 
Artemis 1.x clients add
+                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to 
the acceptor url.
+                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for 
more information. -->
+
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://localhost:61716?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
+         <acceptor 
name="amqp">tcp://localhost:5772?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+         <!-- STOMP Acceptor. -->
+         <acceptor 
name="stomp">tcp://localhost:61713?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP 
for legacy HornetQ clients. -->
+         <acceptor 
name="hornetq">tcp://localhost:5545?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+         <!-- MQTT Acceptor -->
+         <acceptor 
name="mqtt">tcp://localhost:1983?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+      </acceptors>
+
+
+      <cluster-user>cluster-admin</cluster-user>
+
+      <cluster-password>password-admin</cluster-password>
+      <cluster-connections>
+         <cluster-connection name="my-cluster">
+            <connector-ref>artemis</connector-ref>
+            <retry-interval>100</retry-interval>
+            
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
+            <max-hops>1</max-hops>
+            <static-connectors>
+               <connector-ref>node0</connector-ref>
+
+            </static-connectors>
+         </cluster-connection>
+      </cluster-connections>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+
+            <!-- if max-size-bytes and max-size-messages were both enabled, 
the system will enter into paging
+                 based on the first attribute to hits the maximum value -->
+            <!-- limit for the address in bytes, -1 means unlimited -->
+            <max-size-bytes>-1</max-size-bytes>
+
+            <!-- limit for the address in messages, -1 means unlimited -->
+            <max-size-messages>-1</max-size-messages>
+
+            <!-- the size of each file on paging. Notice we keep files in 
memory while they are in use.
+                 Lower this setting if you have too many queues in memory. -->
+            <page-size-bytes>10M</page-size-bytes>
+
+            <!-- limit how many messages are read from paging into the Queue. 
-->
+            <max-read-page-messages>-1</max-read-page-messages>
+
+            <!-- limit how much memory is read from paging into the Queue. -->
+            <max-read-page-bytes>20M</max-read-page-bytes>
+
+            <redistribution-delay>0</redistribution-delay>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-delete-queues>false</auto-delete-queues>
+            <auto-delete-addresses>false</auto-delete-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="ClusteredLargeMessageInterruptTest">
+            <anycast>
+               <queue name="ClusteredLargeMessageInterruptTest" />
+            </anycast>
+         </address>
+
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard 
LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git 
a/tests/soak-tests/src/main/resources/servers/lmbroker2/management.xml 
b/tests/soak-tests/src/main/resources/servers/lmbroker2/management.xml
new file mode 100644
index 0000000000..4c2b254bd5
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker2/management.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<management-context xmlns="http://activemq.apache.org/schema";>
+   <connector connector-port="1199"/>
+   <authorisation>
+      <allowlist>
+         <entry domain="hawtio"/>
+      </allowlist>
+      <default-access>
+         <access method="list*" roles="amq"/>
+         <access method="get*" roles="amq"/>
+         <access method="is*" roles="amq"/>
+         <access method="set*" roles="amq"/>
+         <access method="*" roles="amq"/>
+      </default-access>
+      <role-access>
+         <match domain="org.apache.activemq.artemis">
+            <access method="list*" roles="amq"/>
+            <access method="get*" roles="amq"/>
+            <access method="is*" roles="amq"/>
+            <access method="set*" roles="amq"/>
+            <!-- Note count and browse are need to access the browse tab in 
the console-->
+            <access method="browse*" roles="amq"/>
+            <access method="count*" roles="amq"/>
+            <access method="*" roles="amq"/>
+         </match>
+         <!--example of how to configure a specific object-->
+         <!--<match domain="org.apache.activemq.artemis" 
key="subcomponent=queues">
+            <access method="list*" roles="view,update,amq"/>
+            <access method="get*" roles="view,update,amq"/>
+            <access method="is*" roles="view,update,amq"/>
+            <access method="set*" roles="update,amq"/>
+            <access method="*" roles="amq"/>
+         </match>-->
+      </role-access>
+   </authorisation>
+</management-context>
\ No newline at end of file
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
new file mode 100644
index 0000000000..fd58bb0eab
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final String SERVER_NAME_0 = "lmbroker1";
+   public static final String SERVER_NAME_1 = "lmbroker2";
+
+
+
+   private static final String JMX_SERVER_HOSTNAME = "localhost";
+   private static final int JMX_SERVER_PORT_0 = 1099;
+   private static final int JMX_SERVER_PORT_1 = 1199;
+
+   static String server1URI = "service:jmx:rmi:///jndi/rmi://" + 
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+   static ObjectNameBuilder builderServer1 = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lmbroker1", true);
+
+   static String server2URI = "service:jmx:rmi:///jndi/rmi://" + 
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_1 + "/jmxrmi";
+   static ObjectNameBuilder builderServer2 = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lmbroker2", true);
+
+   private volatile boolean runningSend = true;
+   private volatile boolean runningConsumer = true;
+   private final AtomicInteger errors = new AtomicInteger(0);
+
+   static final String largebody = createBody();
+   static final int BODY_SIZE = 500 * 1024;
+
+   private static String createBody() {
+      StringBuffer buffer = new StringBuffer();
+      while (buffer.length() < BODY_SIZE) {
+         buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T REALLY 
CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I DON'T 
REALLY CARE ");
+      }
+      return buffer.toString();
+   }
+
+   Process serverProcess;
+   Process serverProcess2;
+
+   public ConnectionFactory createConnectionFactory(int broker, String 
protocol) {
+      if (protocol.equals("CORE")) {
+         switch (broker) {
+            // I need the connections stable in the selected server
+            case 0:
+               return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+            case 1:
+               return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+            default:
+               logger.warn("undefined argument {}", broker);
+               throw new IllegalArgumentException("undefined");
+         }
+      } else {
+         return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + 
(61616 + broker * 100) + "?ha=false");
+      }
+   }
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      cleanupData(SERVER_NAME_1);
+      serverProcess = startServer0();
+      serverProcess2 = startServer1();
+      disableCheckThread();
+   }
+
+   private Process startServer0() throws Exception {
+      return startServer(SERVER_NAME_0, 0, 30000);
+   }
+
+   private Process startServer1() throws Exception {
+      return startServer(SERVER_NAME_1, 100, 30000);
+   }
+
+   @Test
+   public void testLargeMessageAMQPTX() throws Throwable {
+      testInterrupt("AMQP", true);
+   }
+
+   @Test
+   public void testInterruptAMQPNonTX() throws Throwable {
+      testInterrupt("AMQP", false);
+   }
+
+   @Test
+   public void testInterruptCORETX() throws Throwable {
+      testInterrupt("CORE", true);
+   }
+
+   @Test
+   public void testInterruptOPENWIRETX() throws Throwable {
+      testInterrupt("OPENWIRE", true);
+   }
+
+   @Test
+   public void testInterruptCORENonTX() throws Throwable {
+      testInterrupt("CORE", false);
+   }
+
+   private CountDownLatch startSendingThreads(Executor executor, String 
protocol, int broker, int threads, boolean tx, String queueName) {
+      runningSend = true;
+      CountDownLatch done = new CountDownLatch(threads);
+
+      ConnectionFactory factory = createConnectionFactory(broker, protocol);
+      final CyclicBarrier startFlag = new CyclicBarrier(threads);
+
+      for (int i = 0; i < threads; i++) {
+         int threadID = i;
+         executor.execute(() -> {
+            int numberOfMessages = 0;
+            try {
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+
+               startFlag.await(10, TimeUnit.SECONDS);
+               while (runningSend) {
+                  producer.send(session.createTextMessage(largebody));
+                  if (tx) {
+                     session.commit();
+                  }
+                  if (numberOfMessages++ % 10 == 0) {
+                     logger.info("Sent {}", numberOfMessages);
+                  }
+               }
+            } catch (Exception e) {
+               logger.info("Thread {} got an error {}", threadID, 
e.getMessage());
+            } finally {
+               done.countDown();
+               logger.info("CountDown:: current Count {}", done.getCount());
+            }
+         });
+      }
+
+      return done;
+   }
+
+
+   private CountDownLatch startConsumingThreads(Executor executor, String 
protocol, int broker, int threads, boolean tx, String queueName) {
+      runningConsumer = true;
+      CountDownLatch done = new CountDownLatch(threads);
+
+      ConnectionFactory factory = createConnectionFactory(broker, protocol);
+      final CyclicBarrier startFlag = new CyclicBarrier(threads);
+
+      for (int i = 0; i < threads; i++) {
+         executor.execute(() -> {
+            int numberOfMessages = 0;
+            try {
+               Connection connection = factory.createConnection();
+               connection.start();
+               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+
+               startFlag.await(10, TimeUnit.SECONDS);
+               while (runningConsumer) {
+                  TextMessage message = (TextMessage)consumer.receive(100);
+                  if (message != null) {
+                     if (!message.getText().startsWith(largebody)) {
+                        logger.warn("Body does not match!");
+                        errors.incrementAndGet();
+                     }
+                     if (tx) {
+                        session.commit();
+                     }
+                     if (numberOfMessages++ % 10 == 0) {
+                        logger.info("Received {}", numberOfMessages);
+                     }
+                  }
+               }
+            } catch (Exception e) {
+            } finally {
+               logger.info("Done sending");
+               done.countDown();
+            }
+         });
+      }
+
+      return done;
+   }
+
+
+
+   // this test has sleeps as the test will send while still active
+   // we keep sending all the time.. so the testInterruptLM acts like a 
controller telling the threads when to stop
+   private void testInterrupt(String protocol, boolean tx) throws Throwable {
+      final int SENDING_THREADS = 10;
+      final int CONSUMING_THREADS = 10;
+      final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
+
+      String queueName = "ClusteredLargeMessageInterruptTest";
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
+      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 0, CONSUMING_THREADS, tx, queueName);
+
+      Thread.sleep(2000);
+
+      serverProcess.destroyForcibly();
+      runningSend = false;
+      runningConsumer = false;
+      Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+      Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+      Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+      logger.info("All receivers and senders are done!!!");
+
+      serverProcess = startServer0();
+
+      Thread.sleep(2000);
+
+      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
+      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
+
+      serverProcess2.destroyForcibly();
+      Assert.assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
+      runningSend = false;
+      runningConsumer = false;
+      Assert.assertTrue(sendDone.await(1, TimeUnit.MINUTES));
+      Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+      serverProcess2 = startServer1();
+
+      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
+      receiverDone = startConsumingThreads(executorService, protocol, 1, 
CONSUMING_THREADS, tx, queueName);
+
+      Thread.sleep(2000);
+      runningSend = false;
+      Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+      Wait.assertEquals(0, queueControl1::getMessageCount);
+      Wait.assertEquals(0, queueControl2::getMessageCount);
+
+      runningConsumer = false;
+      Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
+      File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
+
+      Wait.assertEquals(0, () -> lmFolder.listFiles().length);
+      Wait.assertEquals(0, () -> lmFolder2.listFiles().length);
+      Assert.assertEquals(0, errors.get());
+   }
+
+   @Test
+   public void testBridgeFailureAMQP() throws Throwable {
+      testInterruptFailOnBridge("AMQP", false);
+   }
+
+   @Test
+   public void testBridgeFailureCORE() throws Throwable {
+      testInterruptFailOnBridge("CORE", false);
+   }
+
+
+   // this is a slight variation of testInterruptLM where I switch over 
consumers before killing the previous node
+   // this is to force messages being redistributed and try to get the bridge 
to failure.
+   // I could played with a parameter but ellected to copy instead for 
simplicity
+   private void testInterruptFailOnBridge(String protocol, boolean tx) throws 
Throwable {
+      final int SENDING_THREADS = 10;
+      final int CONSUMING_THREADS = 10;
+      final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
+
+      String queueName = "ClusteredLargeMessageInterruptTest";
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+      runAfter(executorService::shutdownNow);
+
+      // only start the sender for a while
+      CountDownLatch sendDone = startSendingThreads(executorService, protocol, 
0, SENDING_THREADS, tx, queueName);
+
+      Thread.sleep(2000);
+
+      runningSend = runningConsumer = false;
+
+      serverProcess.destroyForcibly();
+      Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
+      Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+      sendDone = startSendingThreads(executorService, protocol, 1, 
SENDING_THREADS, tx, queueName);
+      CountDownLatch receiverDone = startConsumingThreads(executorService, 
protocol, 1, CONSUMING_THREADS, tx, queueName);
+      serverProcess.destroyForcibly();
+      Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+      serverProcess = startServer0();
+
+      Thread.sleep(5000);
+      runningSend = false;
+      Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+      QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+      QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, 
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
+      File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + 
"/data/large-messages");
+      Wait.waitFor(() -> lmFolder.listFiles().length == 0 && 
lmFolder2.listFiles().length == 0);
+      Wait.assertTrue(() -> queueControl1.getMessageCount() == 0 && 
queueControl2.getMessageCount() == 0);
+
+      runningConsumer = false;
+      Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+
+      Wait.assertEquals(0, () -> lmFolder.listFiles().length);
+      Wait.assertEquals(0, () -> {
+         logger.info("queueControl2.count={}", 
queueControl2.getMessageCount());
+         return lmFolder2.listFiles().length;
+      });
+      Assert.assertEquals(0, errors.get());
+
+   }
+
+
+}
\ No newline at end of file


Reply via email to