This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ed5f63538e ARTEMIS-4155 Fixing deadlock on LargeMessage conversion and
retention
ed5f63538e is described below
commit ed5f63538e86529348500a4d524c47b3ffbc0951
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Feb 3 12:03:30 2023 -0500
ARTEMIS-4155 Fixing deadlock on LargeMessage conversion and retention
---
.../impl/journal/JournalStorageManager.java | 2 +-
.../artemis/core/server/replay/ReplayManager.java | 5 +-
tests/soak-tests/pom.xml | 28 +++
.../servers/replay/large-message/broker.xml | 246 +++++++++++++++++++++
.../servers/replay/large-message/management.xml | 52 +++++
.../soak/retention/LargeMessageRetentionTest.java | 205 +++++++++++++++++
6 files changed, 536 insertions(+), 2 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index cc645cb285..aaecba0702 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -878,7 +878,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
private void historyBody(long messageId, EncodingSupport partialBuffer) {
try {
- messageJournal.appendAddEvent(messageId,
JournalRecordIds.ADD_MESSAGE_BODY, EncoderPersister.getInstance(),
partialBuffer, true, null);
+ messageJournal.appendAddEvent(messageId,
JournalRecordIds.ADD_MESSAGE_BODY, EncoderPersister.getInstance(),
partialBuffer, false, null);
} catch (Exception e) {
logger.warn("Error processing history large message body for {}",
messageId, e);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
index a425b83b6f..147d1d0cc2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java
@@ -70,7 +70,7 @@ public class ReplayManager {
}
public void replay(Date start, Date end, String sourceAddress, String
targetAddressParameter, String filterStr) throws Exception {
- logger.debug("Replay::{}", sourceAddress);
+ logger.debug("Replay start::sourceAddress={}", sourceAddress);
if (sourceAddress == null) {
throw new NullPointerException("sourceAddress");
@@ -129,6 +129,7 @@ public class ReplayManager {
continue;
}
}
+ logger.debug("Reading retention file {}", file);
JournalImpl.readJournalFile(messagesFF, file, new
JournalReaderCallback() {
@Override
public void onReadEventRecord(RecordInfo info) throws Exception {
@@ -179,6 +180,8 @@ public class ReplayManager {
}, null, false, null);
}
+
+ logger.debug("Replay done::sourceAddress={}", sourceAddress);
}
private boolean messageMatch(Filter filter, Message message, String
sourceAddress, String targetAddress) {
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index 0fd14abfb7..4b3e65f42f 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -219,6 +219,34 @@
</args>
</configuration>
</execution>
+ <!-- Used on TestRetention -->
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-lmreplay</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>false</noWeb>
+
<instance>${basedir}/target/replay/large-message</instance>
+
<configuration>${basedir}/target/classes/servers/replay/large-message</configuration>
+ <args>
+ <!-- this is needed to run the server remotely -->
+ <arg>--java-options</arg>
+ <arg>-Djava.rmi.server.hostname=localhost</arg>
+ <arg>--journal-retention</arg>
+ <arg>1</arg>
+ <arg>--queues</arg>
+ <arg>RetentionTest</arg>
+ <arg>--name</arg>
+ <arg>large-message</arg>
+ </args>
+ </configuration>
+ </execution>
</executions>
</plugin>
diff --git
a/tests/soak-tests/src/main/resources/servers/replay/large-message/broker.xml
b/tests/soak-tests/src/main/resources/servers/replay/large-message/broker.xml
new file mode 100644
index 0000000000..74890e13b0
--- /dev/null
+++
b/tests/soak-tests/src/main/resources/servers/replay/large-message/broker.xml
@@ -0,0 +1,246 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>replay</name>
+
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-retention-directory period="1"
unit="DAYS">./data/retention</journal-retention-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <!-- using a small journal-buffer-size to allow the message to be
converted before journal persistence -->
+ <journal-buffer-size>20k</journal-buffer-size>
+
+ <journal-file-size>500K</journal-file-size>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+
+ <!-- the system will enter into page mode once you hit this limit. This
is an estimate in bytes of how much the messages are using in memory
+
+ The system will use half of the available memory (-Xmx) by default for
the global-max-size.
+ You may specify a different value here if you need to customize it to
your needs.
+
+ <global-max-size>100Mb</global-max-size> -->
+
+ <!-- the maximum number of messages accepted before entering full
address mode.
+ if global-max-size is specified the full address mode will be
specified by whatever hits it first. -->
+ <global-max-messages>-1</global-max-messages>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+ <!-- amqpDuplicateDetection: If you are not using duplicate
detection, set this to false
+ as duplicate detection requires
applicationProperties to be parsed on the server. -->
+ <!-- amqpMinLargeMessageSize: Determines how many bytes are
considered large, so we start using files to hold their data.
+ default: 102400, -1 would mean to
disable large mesasge control -->
+
+ <!-- Note: If an acceptor needs to be compatible with HornetQ and/or
Artemis 1.x clients add
+ "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to
the acceptor url.
+ See https://issues.apache.org/jira/browse/ARTEMIS-1644 for
more information. -->
+
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
+
+ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
+ <acceptor
name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
+
+ <!-- STOMP Acceptor. -->
+ <acceptor
name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+ <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP
for legacy HornetQ clients. -->
+ <acceptor
name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+ <!-- MQTT Acceptor -->
+ <acceptor
name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+ </acceptors>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+
+ <!-- disabling paging -->
+ <max-size-messages>-1</max-size-messages>
+
+ <!-- the size of each file on paging. Notice we keep files in
memory while they are in use.
+ Lower this setting if you have too many queues in memory. -->
+ <page-size-bytes>10M</page-size-bytes>
+
+ <!-- limit how many messages are read from paging into the Queue.
-->
+ <max-read-page-messages>-1</max-read-page-messages>
+
+ <!-- limit how much memory is read from paging into the Queue. -->
+ <max-read-page-bytes>20M</max-read-page-bytes>
+
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-delete-queues>false</auto-delete-queues>
+ <auto-delete-addresses>false</auto-delete-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="RetentionTest">
+ <anycast>
+ <queue name="RetentionTest" />
+ </anycast>
+ </address>
+
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git
a/tests/soak-tests/src/main/resources/servers/replay/large-message/management.xml
b/tests/soak-tests/src/main/resources/servers/replay/large-message/management.xml
new file mode 100644
index 0000000000..1d38e28ac9
--- /dev/null
+++
b/tests/soak-tests/src/main/resources/servers/replay/large-message/management.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<management-context xmlns="http://activemq.apache.org/schema">
+ <connector connector-port="1099"/>
+ <authorisation>
+ <allowlist>
+ <entry domain="hawtio"/>
+ </allowlist>
+ <default-access>
+ <access method="list*" roles="amq"/>
+ <access method="get*" roles="amq"/>
+ <access method="is*" roles="amq"/>
+ <access method="set*" roles="amq"/>
+ <access method="*" roles="amq"/>
+ </default-access>
+ <role-access>
+ <match domain="org.apache.activemq.artemis">
+ <access method="list*" roles="amq"/>
+ <access method="get*" roles="amq"/>
+ <access method="is*" roles="amq"/>
+ <access method="set*" roles="amq"/>
+ <!-- Note count and browse are need to access the browse tab in
the console-->
+ <access method="browse*" roles="amq"/>
+ <access method="count*" roles="amq"/>
+ <access method="*" roles="amq"/>
+ </match>
+ <!--example of how to configure a specific object-->
+ <!--<match domain="org.apache.activemq.artemis"
key="subcomponent=queues">
+ <access method="list*" roles="view,update,amq"/>
+ <access method="get*" roles="view,update,amq"/>
+ <access method="is*" roles="view,update,amq"/>
+ <access method="set*" roles="update,amq"/>
+ <access method="*" roles="amq"/>
+ </match>-->
+ </role-access>
+ </authorisation>
+</management-context>
\ No newline at end of file
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/retention/LargeMessageRetentionTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/retention/LargeMessageRetentionTest.java
new file mode 100644
index 0000000000..152bf25945
--- /dev/null
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/retention/LargeMessageRetentionTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.retention;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// The server used by this test has the journal retention configured.
+// The server should not enter into a deadlock state just because retention is
being used.
+// The focus of this test is to make sure all messages are sent and received
normally
+public class LargeMessageRetentionTest extends SoakTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String JMX_SERVER_HOSTNAME = "localhost";
+ private static final int JMX_SERVER_PORT_0 = 1099;
+ static String liveURI = "service:jmx:rmi:///jndi/rmi://" +
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+ static ObjectNameBuilder liveNameBuilder =
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(),
"replay", true);
+
+ public static final String SERVER_NAME_0 = "replay/large-message";
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ startServer(SERVER_NAME_0, 0, 30000);
+ disableCheckThread();
+ }
+
+ @Test
+ public void testRetentionOpenWire() throws Throwable {
+ testRetention("OPENWIRE", 100, 10, 200 * 1024, 10);
+ }
+
+ @Test
+ public void testRetentionAMQP() throws Throwable {
+ testRetention("AMQP", 100, 10, 50 * 1024, 10);
+ }
+
+ @Test
+ public void testRetentionAMQPRealLarge() throws Throwable {
+ testRetention("AMQP", 100, 10, 300 * 1024, 10);
+ }
+
+ // in this case messages are not really > min-large-message-size, but they
will be converted because of the journal small buffer size
+ @Test
+ public void testRetentionCore() throws Throwable {
+ testRetention("CORE", 100, 10, 50 * 1024, 10);
+ }
+
+ // in this case the messages are actually large
+ @Test
+ public void testRetentionCoreRealLarge() throws Throwable {
+ testRetention("CORE", 100, 10, 300 * 1024, 10);
+ }
+
+ private void testRetention(String protocol, int NUMBER_OF_MESSAGES, int
backlog, int bodySize, int producers) throws Throwable {
+ Assert.assertTrue(NUMBER_OF_MESSAGES % producers == 0); // checking that
it is a multiple
+
+ ActiveMQServerControl serverControl = getServerControl(liveURI,
liveNameBuilder, 5000);
+
+ final Semaphore consumerCredits = new Semaphore(-backlog);
+ final String queueName = "RetentionTest";
+ final AtomicInteger errors = new AtomicInteger(0);
+ final CountDownLatch latchReceiver = new CountDownLatch(1);
+ final CountDownLatch latchSender = new CountDownLatch(producers);
+
+ String bufferStr;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < bodySize; i++) {
+ buffer.append("*");
+ }
+ bufferStr = RandomUtil.randomString() + buffer;
+ }
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+
+ ExecutorService executor = Executors.newFixedThreadPool(1 * producers);
+ runAfter(executor::shutdownNow);
+
+ executor.execute(() -> {
+ try (Connection consumerConnection = factory.createConnection()) {
+ HashMap<Integer, AtomicInteger> messageSequences = new HashMap<>();
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSession.createQueue(queueName);
+ consumerConnection.start();
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ logger.debug("Acquiring semop at {}", i);
+ Assert.assertTrue(consumerCredits.tryAcquire(1,
TimeUnit.MINUTES));
+ TextMessage message = (TextMessage) consumer.receive(60_000);
+ Assert.assertNotNull(message);
+ int producerI = message.getIntProperty("producerI");
+ AtomicInteger messageSequence = messageSequences.get(producerI);
+ if (messageSequence == null) {
+ messageSequence = new AtomicInteger(0);
+ messageSequences.put(producerI, messageSequence);
+ }
+ Assert.assertEquals(messageSequence.getAndIncrement(),
message.getIntProperty("messageI"));
+ logger.info("Received message {}", i);
+ Assert.assertEquals(bufferStr, message.getText());
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ latchReceiver.countDown();
+ }
+ });
+
+ for (int producerID = 0; producerID < producers; producerID++) {
+ int theProducerID = producerID; // to be used within the executor's
inner method
+ executor.execute(() -> {
+ try (Connection producerConnection = factory.createConnection()) {
+ Session producerSession =
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = producerSession.createQueue(queueName);
+ MessageProducer producer =
producerSession.createProducer(queue);
+
+ for (int messageI = 0; messageI < NUMBER_OF_MESSAGES /
producers; messageI++) {
+ logger.info("Sending message {} from producerID", messageI,
theProducerID);
+ Message message =
producerSession.createTextMessage(bufferStr);
+ message.setIntProperty("messageI", messageI);
+ message.setIntProperty("producerI", theProducerID);
+ producer.send(message);
+ consumerCredits.release();
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ latchSender.countDown();
+ }
+ });
+
+ }
+
+ Assert.assertTrue(latchSender.await(10, TimeUnit.MINUTES));
+ consumerCredits.release(backlog);
+ Assert.assertTrue(latchReceiver.await(10, TimeUnit.MINUTES));
+ Assert.assertEquals(0, errors.get());
+
+ try (Connection consumerConnection = factory.createConnection()) {
+ HashMap<Integer, AtomicInteger> messageSequences = new HashMap<>();
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSession.createQueue(queueName);
+ consumerConnection.start();
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ Assert.assertNull(consumer.receiveNoWait());
+
+ serverControl.replay(queueName, queueName, "producerI=0 AND
messageI>=0 AND messageI<10");
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = (TextMessage) consumer.receive(300_000);
+ Assert.assertNotNull(message);
+ logger.info("Received replay message {}", i);
+ Assert.assertEquals(0, message.getIntProperty("producerI"));
+ Assert.assertEquals(i, message.getIntProperty("messageI"));
+ Assert.assertEquals(bufferStr, message.getText());
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+
+}