This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 2c037386b6 ARTEMIS-4206 Unreferenced AMQP Large Messages not removed
right away, requiring a reboot
2c037386b6 is described below
commit 2c037386b60a6a83d1d8548fea750525b3488567
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 14 13:15:07 2023 -0400
ARTEMIS-4206 Unreferenced AMQP Large Messages not removed right away,
requiring a reboot
---
.../journal/AbstractJournalStorageManager.java | 4 +
.../integration/amqp/AmqpLargeMessageTest.java | 88 +++++
.../client/LargeMessageAvoidLargeMessagesTest.java | 8 +
.../client/LargeMessageCompressTest.java | 7 +
.../tests/integration/client/LargeMessageTest.java | 46 +++
tests/soak-tests/pom.xml | 60 ++++
.../resources/servers/dual-federation1/broker.xml | 281 ++++++++++++++++
.../resources/servers/dual-federation2/broker.xml | 280 ++++++++++++++++
.../main/resources/servers/lmbroker1/broker.xml | 281 ++++++++++++++++
.../resources/servers/lmbroker1/management.xml | 52 +++
.../main/resources/servers/lmbroker2/broker.xml | 283 ++++++++++++++++
.../resources/servers/lmbroker2/management.xml | 52 +++
.../ClusteredLargeMessageInterruptTest.java | 361 +++++++++++++++++++++
13 files changed, 1803 insertions(+)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 91bb8131e7..7b86529a4b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1015,6 +1015,10 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
storedLargeMessages.remove(message.getMessageID());
}
+ if (message.isLargeMessage()) {
+ largeMessages.add((LargeServerMessage) message);
+ }
+
messages.put(record.id, message);
break;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 143576f38a..94bb09c7fa 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -46,6 +46,7 @@ import
org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
@@ -994,6 +995,93 @@ public class AmqpLargeMessageTest extends
AmqpClientTestSupport {
}
}
+
+ @Test
+ public void testDeleteUnreferencedMessage() throws Exception {
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = createAmqpMessage((byte)'A', payload);
+ message.setDurable(true);
+ sender.send(message);
+ sender.close();
+ } finally {
+ connection.close();
+ }
+
+ final org.apache.activemq.artemis.core.server.Queue queue =
server.locateQueue(getTestName());
+ queue.forEach(ref -> {
+ if (ref.getMessage().isLargeMessage()) {
+ try {
+ // simulating an ACK but the server crashed before the delete
of the record, and the large message file
+ server.getStorageManager().storeAcknowledge(queue.getID(),
ref.getMessageID());
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ });
+
+ server.stop();
+
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+
+ server.start();
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));
+
+ validateNoFilesOnLargeDir();
+ runAfter(server::stop);
+ }
+
+ @Test
+ public void testSimpleLargeMessageRestart() throws Exception {
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = createAmqpMessage((byte)'A', payload);
+ message.setDurable(true);
+ sender.send(message);
+ sender.close();
+ } finally {
+ connection.close();
+ }
+
+ server.stop();
+
+ AssertionLoggerHandler.startCapture();
+ server.start();
+
+ // These two should not happen as the consumer will receive them
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221019")); //
unferenced record
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221018")); //
unferenced large message
+
+ connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ message.accept();
+ receiver.close();
+ session.close();
+ } finally {
+ connection.close();
+ }
+
+ validateNoFilesOnLargeDir();
+ runAfter(server::stop);
+ }
+
+
private void sendObjectMessages(int nMsgs, ConnectionFactory factory)
throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
index 7c3bc3361f..ed9efdce5b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
/**
@@ -58,6 +59,13 @@ public class LargeMessageAvoidLargeMessagesTest extends
LargeMessageTest {
return
super.createFactory(isNetty).setMinLargeMessageSize(10240).setCompressLargeMessage(true);
}
+ @Override
+ @Test
+ public void testDeleteUnreferencedMessage() {
+ // this test makes no sense as it needs to delete a large message and
its record
+ Assume.assumeFalse(true);
+ }
+
@Test
public void testSimpleSendOnAvoid() throws Exception {
ActiveMQServer server = createServer(true, isNetty());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index a4e1a722cb..edc83e3441 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -86,6 +86,13 @@ public class LargeMessageCompressTest extends
LargeMessageTest {
return super.createFactory(isNetty).setCompressLargeMessage(true);
}
+ @Override
+ @Test
+ public void testDeleteUnreferencedMessage() {
+ // this test makes no sense as it needs to delete a large message and
its record
+ Assume.assumeFalse(true);
+ }
+
@Test
public void testLargeMessageCompressionNotCompressedAndBrowsed() throws
Exception {
final int messageSize = (int) (3.5 *
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 9ef85d999d..17e1d88c91 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.ServerProducer;
import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import
org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -375,6 +376,51 @@ public class LargeMessageTest extends LargeMessageTestBase
{
validateNoFilesOnLargeDir();
}
+ @Test
+ public void testDeleteUnreferencedMessage() throws Exception {
+ final int messageSize = (int) (3.5 *
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+ server.start();
+
+ server.createQueue(new
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+ ClientSessionFactory sf =
addSessionFactory(createSessionFactory(locator));
+
+ ClientSession session = addClientSession(sf.createSession(false, true,
false));
+ ClientProducer producer = session.createProducer(getName());
+
+ Message clientFile = createLargeClientMessageStreaming(session,
messageSize, true);
+
+ producer.send(clientFile);
+
+ session.close();
+
+ final Queue queue = server.locateQueue(getName());
+ queue.forEach(ref -> {
+ // simulating an ACK but the server crashed before the delete of the
record, and the large message file
+ if (ref.getMessage().isLargeMessage()) {
+ try {
+ server.getStorageManager().storeAcknowledge(queue.getID(),
ref.getMessageID());
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ });
+ server.stop();
+
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+
+ server.start();
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));
+
+ validateNoFilesOnLargeDir();
+ runAfter(server::stop);
+
+ }
+
@Test
public void testPendingRecord() throws Exception {
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index 7626a743f9..3bc6398e53 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -273,6 +273,66 @@
</configuration>
</execution>
+ <!-- Used on ClusteredLargeMessageInterruptTest -->
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-lmbroker1</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>false</noWeb>
+ <instance>${basedir}/target/lmbroker1</instance>
+
<configuration>${basedir}/target/classes/servers/lmbroker1</configuration>
+ <args>
+ <arg>--java-options</arg>
+ <arg>-Djava.rmi.server.hostname=localhost</arg>
+ <arg>--clustered</arg>
+ <arg>--staticCluster</arg>
+ <arg>tcp://localhost:61716</arg>
+ <arg>--java-options</arg>
+ <arg>-Djava.rmi.server.hostname=localhost</arg>
+ <arg>--queues</arg>
+ <arg>ClusteredLargeMessageInterruptTest</arg>
+ <arg>--name</arg>
+ <arg>lmbroker1</arg>
+ </args>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-lmbroker2</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>false</noWeb>
+ <instance>${basedir}/target/lmbroker2</instance>
+
<configuration>${basedir}/target/classes/servers/lmbroker2</configuration>
+ <portOffset>100</portOffset>
+ <args>
+ <arg>--java-options</arg>
+ <arg>-Djava.rmi.server.hostname=localhost</arg>
+ <arg>--clustered</arg>
+ <arg>--staticCluster</arg>
+ <arg>tcp://localhost:61616</arg>
+ <arg>--java-options</arg>
+ <arg>-Djava.rmi.server.hostname=localhost</arg>
+ <arg>--queues</arg>
+ <arg>ClusteredLargeMessageInterruptTest</arg>
+ <arg>--name</arg>
+ <arg>lmbroker2</arg>
+ </args>
+ </configuration>
+ </execution>
</executions>
</plugin>
diff --git
a/tests/soak-tests/src/main/resources/servers/dual-federation1/broker.xml
b/tests/soak-tests/src/main/resources/servers/dual-federation1/broker.xml
new file mode 100644
index 0000000000..ba0293be91
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/dual-federation1/broker.xml
@@ -0,0 +1,281 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>dual-federation1</name>
+
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+
+ <!-- if you want to retain your journal uncomment this following
configuration.
+
+ This will allow your system to keep 7 days of your data, up to 10G.
Tweak it accordingly to your use case and capacity.
+
+ it is recommended to use a separate storage unit from the journal for
performance considerations.
+
+ <journal-retention-directory period="7" unit="DAYS"
storage-limit="10G">data/retention</journal-retention-directory>
+
+ You can also enable retention by using the argument journal-retention on
the `artemis create` command -->
+
+
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+
+ <!-- the system will enter into page mode once you hit this limit. This
is an estimate in bytes of how much the messages are using in memory
+
+ The system will use half of the available memory (-Xmx) by default for
the global-max-size.
+ You may specify a different value here if you need to customize it to
your needs.
+
+ <global-max-size>100Mb</global-max-size> -->
+
+ <!-- the maximum number of messages accepted before entering full
address mode.
+ if global-max-size is specified the full address mode will be
specified by whatever hits it first. -->
+ <global-max-messages>-1</global-max-messages>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+ <!-- amqpDuplicateDetection: If you are not using duplicate
detection, set this to false
+ as duplicate detection requires
applicationProperties to be parsed on the server. -->
+ <!-- amqpMinLargeMessageSize: Determines how many bytes are
considered large, so we start using files to hold their data.
+ default: 102400, -1 would mean to
disable large mesasge control -->
+
+ <!-- Note: If an acceptor needs to be compatible with HornetQ and/or
Artemis 1.x clients add
+ "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to
the acceptor url.
+ See https://issues.apache.org/jira/browse/ARTEMIS-1644 for
more information. -->
+
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
+ <acceptor
name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+ <!-- STOMP Acceptor. -->
+ <acceptor
name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+ <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP
for legacy HornetQ clients. -->
+ <acceptor
name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+ <!-- MQTT Acceptor -->
+ <acceptor
name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+ </acceptors>
+
+ <connectors>
+ <connector
name="otherside">tcp://localhost:61716?ackBatchSize=1;consumerWindowSize=-1</connector>
+ </connectors>
+
+ <federations>
+
+ <federation name="federation-1" user="artemis" password="artemis">
+ <upstream name="upstream-1">
+ <static-connectors>
+ <connector-ref>otherside</connector-ref>
+ </static-connectors>
+ <policy ref="policy-1"/>
+ </upstream>
+
+ <queue-policy name="policy-1">
+ <include queue-match="#" address-match="test.#" />
+ </queue-policy>
+
+ </federation>
+
+ </federations>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+
+ <!-- if max-size-bytes and max-size-messages were both enabled,
the system will enter into paging
+ based on the first attribute to hits the maximum value -->
+ <!-- limit for the address in bytes, -1 means unlimited -->
+ <max-size-bytes>-1</max-size-bytes>
+
+ <!-- limit for the address in messages, -1 means unlimited -->
+ <max-size-messages>-1</max-size-messages>
+
+ <!-- the size of each file on paging. Notice we keep files in
memory while they are in use.
+ Lower this setting if you have too many queues in memory. -->
+ <page-size-bytes>10M</page-size-bytes>
+
+ <!-- limit how many messages are read from paging into the Queue.
-->
+ <max-read-page-messages>-1</max-read-page-messages>
+
+ <!-- limit how much memory is read from paging into the Queue. -->
+ <max-read-page-bytes>20M</max-read-page-bytes>
+
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-delete-queues>false</auto-delete-queues>
+ <auto-delete-addresses>false</auto-delete-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="test.queue">
+ <anycast>
+ <queue name="test.queue" />
+ </anycast>
+ </address>
+
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git
a/tests/soak-tests/src/main/resources/servers/dual-federation2/broker.xml
b/tests/soak-tests/src/main/resources/servers/dual-federation2/broker.xml
new file mode 100644
index 0000000000..35da6656cb
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/dual-federation2/broker.xml
@@ -0,0 +1,280 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>dual-federation1</name>
+
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+
+ <!-- if you want to retain your journal uncomment this following
configuration.
+
+ This will allow your system to keep 7 days of your data, up to 10G.
Tweak it accordingly to your use case and capacity.
+
+ it is recommended to use a separate storage unit from the journal for
performance considerations.
+
+ <journal-retention-directory period="7" unit="DAYS"
storage-limit="10G">data/retention</journal-retention-directory>
+
+ You can also enable retention by using the argument journal-retention on
the `artemis create` command -->
+
+
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+
+ <!-- the system will enter into page mode once you hit this limit. This
is an estimate in bytes of how much the messages are using in memory
+
+ The system will use half of the available memory (-Xmx) by default for
the global-max-size.
+ You may specify a different value here if you need to customize it to
your needs.
+
+ <global-max-size>100Mb</global-max-size> -->
+
+ <!-- the maximum number of messages accepted before entering full
address mode.
+ if global-max-size is specified the full address mode will be
specified by whatever hits it first. -->
+ <global-max-messages>-1</global-max-messages>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+ <!-- amqpDuplicateDetection: If you are not using duplicate
detection, set this to false
+ as duplicate detection requires
applicationProperties to be parsed on the server. -->
+ <!-- amqpMinLargeMessageSize: Determines how many bytes are
considered large, so we start using files to hold their data.
+ default: 102400, -1 would mean to
disable large mesasge control -->
+
+ <!-- Note: If an acceptor needs to be compatible with HornetQ and/or
Artemis 1.x clients add
+ "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to
the acceptor url.
+ See https://issues.apache.org/jira/browse/ARTEMIS-1644 for
more information. -->
+
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61716?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
+ <acceptor
name="amqp">tcp://0.0.0.0:5772?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+ <!-- STOMP Acceptor. -->
+ <acceptor
name="stomp">tcp://0.0.0.0:61713?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+ <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP
for legacy HornetQ clients. -->
+ <acceptor
name="hornetq">tcp://0.0.0.0:5545?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+ <!-- MQTT Acceptor -->
+ <acceptor
name="mqtt">tcp://0.0.0.0:1983?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+ </acceptors>
+
+ <connectors>
+ <connector
name="otherside">tcp://localhost:61616?ackBatchSize=1;consumerWindowSize=-1</connector>
+ </connectors>
+
+ <federations>
+
+ <federation name="federation-1" user="artemis" password="artemis">
+ <upstream name="upstream-1">
+ <static-connectors>
+ <connector-ref>otherside</connector-ref>
+ </static-connectors>
+ <policy ref="policy-1"/>
+ </upstream>
+
+ <queue-policy name="policy-1">
+ <include queue-match="#" address-match="test.#" />
+ </queue-policy>
+
+ </federation>
+
+ </federations>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+
+ <!-- if max-size-bytes and max-size-messages were both enabled,
the system will enter into paging
+ based on the first attribute to hits the maximum value -->
+ <!-- limit for the address in bytes, -1 means unlimited -->
+ <max-size-bytes>-1</max-size-bytes>
+
+ <!-- limit for the address in messages, -1 means unlimited -->
+ <max-size-messages>-1</max-size-messages>
+
+ <!-- the size of each file on paging. Notice we keep files in
memory while they are in use.
+ Lower this setting if you have too many queues in memory. -->
+ <page-size-bytes>10M</page-size-bytes>
+
+ <!-- limit how many messages are read from paging into the Queue.
-->
+ <max-read-page-messages>-1</max-read-page-messages>
+
+ <!-- limit how much memory is read from paging into the Queue. -->
+ <max-read-page-bytes>20M</max-read-page-bytes>
+
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-delete-queues>false</auto-delete-queues>
+ <auto-delete-addresses>false</auto-delete-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="test.queue">
+ <anycast>
+ <queue name="test.queue" />
+ </anycast>
+ </address>
+
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git a/tests/soak-tests/src/main/resources/servers/lmbroker1/broker.xml
b/tests/soak-tests/src/main/resources/servers/lmbroker1/broker.xml
new file mode 100644
index 0000000000..e13498c62b
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker1/broker.xml
@@ -0,0 +1,281 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>lmbroker1</name>
+
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+
+ <!-- if you want to retain your journal uncomment this following
configuration.
+
+ This will allow your system to keep 7 days of your data, up to 10G.
Tweak it accordingly to your use case and capacity.
+
+ it is recommended to use a separate storage unit from the journal for
performance considerations.
+
+ <journal-retention-directory period="7" unit="DAYS"
storage-limit="10G">data/retention</journal-retention-directory>
+
+ You can also enable retention by using the argument journal-retention on
the `artemis create` command -->
+
+
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+ <connectors>
+ <!-- Connector used to be announced through cluster connections
and notifications -->
+ <connector name="artemis">tcp://localhost:61616</connector>
+ <connector name = "node0">tcp://localhost:61716</connector>
+ </connectors>
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+
+ <!-- the system will enter into page mode once you hit this limit. This
is an estimate in bytes of how much the messages are using in memory
+
+ The system will use half of the available memory (-Xmx) by default for
the global-max-size.
+ You may specify a different value here if you need to customize it to
your needs.
+
+ <global-max-size>100Mb</global-max-size> -->
+
+ <!-- the maximum number of messages accepted before entering full
address mode.
+ if global-max-size is specified the full address mode will be
specified by whatever hits it first. -->
+ <global-max-messages>-1</global-max-messages>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+ <!-- amqpDuplicateDetection: If you are not using duplicate
detection, set this to false
+ as duplicate detection requires
applicationProperties to be parsed on the server. -->
+ <!-- amqpMinLargeMessageSize: Determines how many bytes are
considered large, so we start using files to hold their data.
+ default: 102400, -1 would mean to
disable large mesasge control -->
+
+ <!-- Note: If an acceptor needs to be compatible with HornetQ and/or
Artemis 1.x clients add
+ "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to
the acceptor url.
+ See https://issues.apache.org/jira/browse/ARTEMIS-1644 for
more information. -->
+
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://localhost:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
+ <acceptor
name="amqp">tcp://localhost:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+ <!-- STOMP Acceptor. -->
+ <acceptor
name="stomp">tcp://localhost:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+ <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP
for legacy HornetQ clients. -->
+ <acceptor
name="hornetq">tcp://localhost:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+ <!-- MQTT Acceptor -->
+ <acceptor
name="mqtt">tcp://localhost:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+ </acceptors>
+
+ <cluster-user>cluster-admin</cluster-user>
+
+ <cluster-password>password-admin</cluster-password>
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <connector-ref>artemis</connector-ref>
+ <retry-interval>100</retry-interval>
+
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
+ <max-hops>1</max-hops>
+ <static-connectors>
+ <connector-ref>node0</connector-ref>
+
+ </static-connectors>
+ </cluster-connection>
+ </cluster-connections>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+
+ <!-- if max-size-bytes and max-size-messages were both enabled,
the system will enter into paging
+ based on the first attribute to hits the maximum value -->
+ <!-- limit for the address in bytes, -1 means unlimited -->
+ <max-size-bytes>-1</max-size-bytes>
+
+ <!-- limit for the address in messages, -1 means unlimited -->
+ <max-size-messages>-1</max-size-messages>
+
+ <!-- the size of each file on paging. Notice we keep files in
memory while they are in use.
+ Lower this setting if you have too many queues in memory. -->
+ <page-size-bytes>10M</page-size-bytes>
+
+ <!-- limit how many messages are read from paging into the Queue.
-->
+ <max-read-page-messages>-1</max-read-page-messages>
+
+ <!-- limit how much memory is read from paging into the Queue. -->
+ <max-read-page-bytes>20M</max-read-page-bytes>
+ <redistribution-delay>0</redistribution-delay>
+
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-delete-queues>false</auto-delete-queues>
+ <auto-delete-addresses>false</auto-delete-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="ClusteredLargeMessageInterruptTest">
+ <anycast>
+ <queue name="ClusteredLargeMessageInterruptTest" />
+ </anycast>
+ </address>
+
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git
a/tests/soak-tests/src/main/resources/servers/lmbroker1/management.xml
b/tests/soak-tests/src/main/resources/servers/lmbroker1/management.xml
new file mode 100644
index 0000000000..1d38e28ac9
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker1/management.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<management-context xmlns="http://activemq.apache.org/schema">
+ <connector connector-port="1099"/>
+ <authorisation>
+ <allowlist>
+ <entry domain="hawtio"/>
+ </allowlist>
+ <default-access>
+ <access method="list*" roles="amq"/>
+ <access method="get*" roles="amq"/>
+ <access method="is*" roles="amq"/>
+ <access method="set*" roles="amq"/>
+ <access method="*" roles="amq"/>
+ </default-access>
+ <role-access>
+ <match domain="org.apache.activemq.artemis">
+ <access method="list*" roles="amq"/>
+ <access method="get*" roles="amq"/>
+ <access method="is*" roles="amq"/>
+ <access method="set*" roles="amq"/>
+ <!-- Note count and browse are need to access the browse tab in
the console-->
+ <access method="browse*" roles="amq"/>
+ <access method="count*" roles="amq"/>
+ <access method="*" roles="amq"/>
+ </match>
+ <!--example of how to configure a specific object-->
+ <!--<match domain="org.apache.activemq.artemis"
key="subcomponent=queues">
+ <access method="list*" roles="view,update,amq"/>
+ <access method="get*" roles="view,update,amq"/>
+ <access method="is*" roles="view,update,amq"/>
+ <access method="set*" roles="update,amq"/>
+ <access method="*" roles="amq"/>
+ </match>-->
+ </role-access>
+ </authorisation>
+</management-context>
\ No newline at end of file
diff --git a/tests/soak-tests/src/main/resources/servers/lmbroker2/broker.xml
b/tests/soak-tests/src/main/resources/servers/lmbroker2/broker.xml
new file mode 100644
index 0000000000..b9c8793702
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker2/broker.xml
@@ -0,0 +1,283 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>lmbroker2</name>
+
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+
+ <!-- if you want to retain your journal uncomment this following
configuration.
+
+ This will allow your system to keep 7 days of your data, up to 10G.
Tweak it accordingly to your use case and capacity.
+
+ it is recommended to use a separate storage unit from the journal for
performance considerations.
+
+ <journal-retention-directory period="7" unit="DAYS"
storage-limit="10G">data/retention</journal-retention-directory>
+
+ You can also enable retention by using the argument journal-retention on
the `artemis create` command -->
+
+
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+ <connectors>
+ <!-- Connector used to be announced through cluster connections
and notifications -->
+ <connector name="artemis">tcp://localhost:61716</connector>
+ <connector name = "node0">tcp://localhost:61616</connector>
+ </connectors>
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+
+ <!-- the system will enter into page mode once you hit this limit. This
is an estimate in bytes of how much the messages are using in memory
+
+ The system will use half of the available memory (-Xmx) by default for
the global-max-size.
+ You may specify a different value here if you need to customize it to
your needs.
+
+ <global-max-size>100Mb</global-max-size> -->
+
+ <!-- the maximum number of messages accepted before entering full
address mode.
+ if global-max-size is specified the full address mode will be
specified by whatever hits it first. -->
+ <global-max-messages>-1</global-max-messages>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+ <!-- amqpDuplicateDetection: If you are not using duplicate
detection, set this to false
+ as duplicate detection requires
applicationProperties to be parsed on the server. -->
+ <!-- amqpMinLargeMessageSize: Determines how many bytes are
considered large, so we start using files to hold their data.
+ default: 102400, -1 would mean to
disable large mesasge control -->
+
+ <!-- Note: If an acceptor needs to be compatible with HornetQ and/or
Artemis 1.x clients add
+ "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to
the acceptor url.
+ See https://issues.apache.org/jira/browse/ARTEMIS-1644 for
more information. -->
+
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://localhost:61716?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
+ <acceptor
name="amqp">tcp://localhost:5772?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+ <!-- STOMP Acceptor. -->
+ <acceptor
name="stomp">tcp://localhost:61713?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+ <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP
for legacy HornetQ clients. -->
+ <acceptor
name="hornetq">tcp://localhost:5545?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+ <!-- MQTT Acceptor -->
+ <acceptor
name="mqtt">tcp://localhost:1983?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+ </acceptors>
+
+
+ <cluster-user>cluster-admin</cluster-user>
+
+ <cluster-password>password-admin</cluster-password>
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <connector-ref>artemis</connector-ref>
+ <retry-interval>100</retry-interval>
+
<message-load-balancing>OFF_WITH_REDISTRIBUTION</message-load-balancing>
+ <max-hops>1</max-hops>
+ <static-connectors>
+ <connector-ref>node0</connector-ref>
+
+ </static-connectors>
+ </cluster-connection>
+ </cluster-connections>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+
+ <!-- if max-size-bytes and max-size-messages were both enabled,
the system will enter into paging
+ based on the first attribute to hits the maximum value -->
+ <!-- limit for the address in bytes, -1 means unlimited -->
+ <max-size-bytes>-1</max-size-bytes>
+
+ <!-- limit for the address in messages, -1 means unlimited -->
+ <max-size-messages>-1</max-size-messages>
+
+ <!-- the size of each file on paging. Notice we keep files in
memory while they are in use.
+ Lower this setting if you have too many queues in memory. -->
+ <page-size-bytes>10M</page-size-bytes>
+
+ <!-- limit how many messages are read from paging into the Queue.
-->
+ <max-read-page-messages>-1</max-read-page-messages>
+
+ <!-- limit how much memory is read from paging into the Queue. -->
+ <max-read-page-bytes>20M</max-read-page-bytes>
+
+ <redistribution-delay>0</redistribution-delay>
+
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-delete-queues>false</auto-delete-queues>
+ <auto-delete-addresses>false</auto-delete-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="ClusteredLargeMessageInterruptTest">
+ <anycast>
+ <queue name="ClusteredLargeMessageInterruptTest" />
+ </anycast>
+ </address>
+
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git
a/tests/soak-tests/src/main/resources/servers/lmbroker2/management.xml
b/tests/soak-tests/src/main/resources/servers/lmbroker2/management.xml
new file mode 100644
index 0000000000..4c2b254bd5
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lmbroker2/management.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<management-context xmlns="http://activemq.apache.org/schema">
+ <connector connector-port="1199"/>
+ <authorisation>
+ <allowlist>
+ <entry domain="hawtio"/>
+ </allowlist>
+ <default-access>
+ <access method="list*" roles="amq"/>
+ <access method="get*" roles="amq"/>
+ <access method="is*" roles="amq"/>
+ <access method="set*" roles="amq"/>
+ <access method="*" roles="amq"/>
+ </default-access>
+ <role-access>
+ <match domain="org.apache.activemq.artemis">
+ <access method="list*" roles="amq"/>
+ <access method="get*" roles="amq"/>
+ <access method="is*" roles="amq"/>
+ <access method="set*" roles="amq"/>
+ <!-- Note count and browse are need to access the browse tab in
the console-->
+ <access method="browse*" roles="amq"/>
+ <access method="count*" roles="amq"/>
+ <access method="*" roles="amq"/>
+ </match>
+ <!--example of how to configure a specific object-->
+ <!--<match domain="org.apache.activemq.artemis"
key="subcomponent=queues">
+ <access method="list*" roles="view,update,amq"/>
+ <access method="get*" roles="view,update,amq"/>
+ <access method="is*" roles="view,update,amq"/>
+ <access method="set*" roles="update,amq"/>
+ <access method="*" roles="amq"/>
+ </match>-->
+ </role-access>
+ </authorisation>
+</management-context>
\ No newline at end of file
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
new file mode 100644
index 0000000000..fd58bb0eab
--- /dev/null
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/ClusteredLargeMessageInterruptTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String SERVER_NAME_0 = "lmbroker1";
+ public static final String SERVER_NAME_1 = "lmbroker2";
+
+
+
+ private static final String JMX_SERVER_HOSTNAME = "localhost";
+ private static final int JMX_SERVER_PORT_0 = 1099;
+ private static final int JMX_SERVER_PORT_1 = 1199;
+
+ static String server1URI = "service:jmx:rmi:///jndi/rmi://" +
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+ static ObjectNameBuilder builderServer1 =
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(),
"lmbroker1", true);
+
+ static String server2URI = "service:jmx:rmi:///jndi/rmi://" +
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_1 + "/jmxrmi";
+ static ObjectNameBuilder builderServer2 =
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(),
"lmbroker2", true);
+
+ private volatile boolean runningSend = true;
+ private volatile boolean runningConsumer = true;
+ private final AtomicInteger errors = new AtomicInteger(0);
+
+ static final String largebody = createBody();
+ static final int BODY_SIZE = 500 * 1024;
+
+ private static String createBody() {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < BODY_SIZE) {
+ buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T REALLY
CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I DON'T
REALLY CARE ");
+ }
+ return buffer.toString();
+ }
+
+ Process serverProcess;
+ Process serverProcess2;
+
+ public ConnectionFactory createConnectionFactory(int broker, String
protocol) {
+ if (protocol.equals("CORE")) {
+ switch (broker) {
+ // I need the connections stable in the selected server
+ case 0:
+ return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+ case 1:
+ return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
+ default:
+ logger.warn("undefined argument {}", broker);
+ throw new IllegalArgumentException("undefined");
+ }
+ } else {
+ return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" +
(61616 + broker * 100) + "?ha=false");
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ cleanupData(SERVER_NAME_1);
+ serverProcess = startServer0();
+ serverProcess2 = startServer1();
+ disableCheckThread();
+ }
+
+ private Process startServer0() throws Exception {
+ return startServer(SERVER_NAME_0, 0, 30000);
+ }
+
+ private Process startServer1() throws Exception {
+ return startServer(SERVER_NAME_1, 100, 30000);
+ }
+
+ @Test
+ public void testLargeMessageAMQPTX() throws Throwable {
+ testInterrupt("AMQP", true);
+ }
+
+ @Test
+ public void testInterruptAMQPNonTX() throws Throwable {
+ testInterrupt("AMQP", false);
+ }
+
+ @Test
+ public void testInterruptCORETX() throws Throwable {
+ testInterrupt("CORE", true);
+ }
+
+ @Test
+ public void testInterruptOPENWIRETX() throws Throwable {
+ testInterrupt("OPENWIRE", true);
+ }
+
+ @Test
+ public void testInterruptCORENonTX() throws Throwable {
+ testInterrupt("CORE", false);
+ }
+
+ private CountDownLatch startSendingThreads(Executor executor, String
protocol, int broker, int threads, boolean tx, String queueName) {
+ runningSend = true;
+ CountDownLatch done = new CountDownLatch(threads);
+
+ ConnectionFactory factory = createConnectionFactory(broker, protocol);
+ final CyclicBarrier startFlag = new CyclicBarrier(threads);
+
+ for (int i = 0; i < threads; i++) {
+ int threadID = i;
+ executor.execute(() -> {
+ int numberOfMessages = 0;
+ try {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (runningSend) {
+ producer.send(session.createTextMessage(largebody));
+ if (tx) {
+ session.commit();
+ }
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Sent {}", numberOfMessages);
+ }
+ }
+ } catch (Exception e) {
+ logger.info("Thread {} got an error {}", threadID,
e.getMessage());
+ } finally {
+ done.countDown();
+ logger.info("CountDown:: current Count {}", done.getCount());
+ }
+ });
+ }
+
+ return done;
+ }
+
+
+ private CountDownLatch startConsumingThreads(Executor executor, String
protocol, int broker, int threads, boolean tx, String queueName) {
+ runningConsumer = true;
+ CountDownLatch done = new CountDownLatch(threads);
+
+ ConnectionFactory factory = createConnectionFactory(broker, protocol);
+ final CyclicBarrier startFlag = new CyclicBarrier(threads);
+
+ for (int i = 0; i < threads; i++) {
+ executor.execute(() -> {
+ int numberOfMessages = 0;
+ try {
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (runningConsumer) {
+ TextMessage message = (TextMessage)consumer.receive(100);
+ if (message != null) {
+ if (!message.getText().startsWith(largebody)) {
+ logger.warn("Body does not match!");
+ errors.incrementAndGet();
+ }
+ if (tx) {
+ session.commit();
+ }
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Received {}", numberOfMessages);
+ }
+ }
+ }
+ } catch (Exception e) {
+ } finally {
+ logger.info("Done sending");
+ done.countDown();
+ }
+ });
+ }
+
+ return done;
+ }
+
+
+
+ // this test has sleeps as the test will send while still active
+ // we keep sending all the time.. so the testInterruptLM acts like a
controller telling the threads when to stop
+ private void testInterrupt(String protocol, boolean tx) throws Throwable {
+ final int SENDING_THREADS = 10;
+ final int CONSUMING_THREADS = 10;
+ final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
+
+ String queueName = "ClusteredLargeMessageInterruptTest";
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+ runAfter(executorService::shutdownNow);
+
+ CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 0, CONSUMING_THREADS, tx, queueName);
+
+ Thread.sleep(2000);
+
+ serverProcess.destroyForcibly();
+ runningSend = false;
+ runningConsumer = false;
+ Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+ Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ logger.info("All receivers and senders are done!!!");
+
+ serverProcess = startServer0();
+
+ Thread.sleep(2000);
+
+ sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
+ receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
+
+ serverProcess2.destroyForcibly();
+ Assert.assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
+ runningSend = false;
+ runningConsumer = false;
+ Assert.assertTrue(sendDone.await(1, TimeUnit.MINUTES));
+ Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+ serverProcess2 = startServer1();
+
+ sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
+ receiverDone = startConsumingThreads(executorService, protocol, 1,
CONSUMING_THREADS, tx, queueName);
+
+ Thread.sleep(2000);
+ runningSend = false;
+ Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
+ QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+ Wait.assertEquals(0, queueControl1::getMessageCount);
+ Wait.assertEquals(0, queueControl2::getMessageCount);
+
+ runningConsumer = false;
+ Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+ File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
+ File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
+
+ Wait.assertEquals(0, () -> lmFolder.listFiles().length);
+ Wait.assertEquals(0, () -> lmFolder2.listFiles().length);
+ Assert.assertEquals(0, errors.get());
+ }
+
+ @Test
+ public void testBridgeFailureAMQP() throws Throwable {
+ testInterruptFailOnBridge("AMQP", false);
+ }
+
+ @Test
+ public void testBridgeFailureCORE() throws Throwable {
+ testInterruptFailOnBridge("CORE", false);
+ }
+
+
+ // this is a slight variation of testInterruptLM where I switch over
consumers before killing the previous node
+ // this is to force messages being redistributed and try to get the bridge
to failure.
+ // I could played with a parameter but ellected to copy instead for
simplicity
+ private void testInterruptFailOnBridge(String protocol, boolean tx) throws
Throwable {
+ final int SENDING_THREADS = 10;
+ final int CONSUMING_THREADS = 10;
+ final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
+
+ String queueName = "ClusteredLargeMessageInterruptTest";
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
+ runAfter(executorService::shutdownNow);
+
+ // only start the sender for a while
+ CountDownLatch sendDone = startSendingThreads(executorService, protocol,
0, SENDING_THREADS, tx, queueName);
+
+ Thread.sleep(2000);
+
+ runningSend = runningConsumer = false;
+
+ serverProcess.destroyForcibly();
+ Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
+ Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ sendDone = startSendingThreads(executorService, protocol, 1,
SENDING_THREADS, tx, queueName);
+ CountDownLatch receiverDone = startConsumingThreads(executorService,
protocol, 1, CONSUMING_THREADS, tx, queueName);
+ serverProcess.destroyForcibly();
+ Assert.assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
+ serverProcess = startServer0();
+
+ Thread.sleep(5000);
+ runningSend = false;
+ Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ QueueControl queueControl1 = getQueueControl(server1URI, builderServer1,
queueName, queueName, RoutingType.ANYCAST, 5000);
+ QueueControl queueControl2 = getQueueControl(server2URI, builderServer2,
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+ File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
+ File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) +
"/data/large-messages");
+ Wait.waitFor(() -> lmFolder.listFiles().length == 0 &&
lmFolder2.listFiles().length == 0);
+ Wait.assertTrue(() -> queueControl1.getMessageCount() == 0 &&
queueControl2.getMessageCount() == 0);
+
+ runningConsumer = false;
+ Assert.assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
+
+
+ Wait.assertEquals(0, () -> lmFolder.listFiles().length);
+ Wait.assertEquals(0, () -> {
+ logger.info("queueControl2.count={}",
queueControl2.getMessageCount());
+ return lmFolder2.listFiles().length;
+ });
+ Assert.assertEquals(0, errors.get());
+
+ }
+
+
+}
\ No newline at end of file