This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1caa406bbf NO-JIRA Adding a test to verify TX in OpenWire Large
Message Handling
1caa406bbf is described below
commit 1caa406bbf2b2492bf7d0789f8a54ea570a0251c
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Feb 8 09:27:29 2023 -0500
NO-JIRA Adding a test to verify TX in OpenWire Large Message Handling
---
.../openwire/OpenWireLargeMessageTest.java | 83 ++++++++++++++++++++++
1 file changed, 83 insertions(+)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
index 80aecbbe7f..02e346ad13 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
@@ -25,7 +25,13 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -40,9 +46,13 @@ import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class OpenWireLargeMessageTest extends BasicOpenWireTest {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public OpenWireLargeMessageTest() {
super();
}
@@ -178,6 +188,79 @@ public class OpenWireLargeMessageTest extends
BasicOpenWireTest {
}
+ @Test
+ public void testSendReceiveLargeMessageTX() throws Exception {
+ int NUMBER_OF_MESSAGES = 400;
+ int TX_SIZE = 100;
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ runAfter(executorService::shutdownNow);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Create 1MB Message
+ String largeString;
+
+ {
+ String randomString = "This is a random String " +
RandomUtil.randomString();
+ StringBuffer largeBuffer = new StringBuffer();
+ while (largeBuffer.length() < 1024 * 1024) {
+ largeBuffer.append(randomString);
+ }
+
+ largeString = largeBuffer.toString();
+ }
+
+ executorService.execute(() -> {
+
+ try (Connection connection = factory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(lmAddress.toString());
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
+ TextMessage m = (TextMessage) consumer.receive(5000);
+ assertEquals(largeString, m.getText());
+ if (received > 0 && received % TX_SIZE == 0) {
+ logger.info("Received {} messages", received);
+ session.commit();
+ }
+ }
+ session.commit();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+
+ });
+
+
+ try (Connection connection = factory.createConnection()) {
+ connection.start();
+
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(lmAddress.toString());
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
+ TextMessage message = session.createTextMessage(largeString);
+ producer.send(message);
+ if (sent > 0 && sent % TX_SIZE == 0) {
+ logger.info("Sent {} messages", sent);
+ session.commit();
+ }
+ }
+ session.commit();
+ }
+
+ latch.await(1, TimeUnit.MINUTES);
+ Assert.assertEquals(0, errors.get());
+ }
+
@Override
protected void extraServerConfig(Configuration serverConfig) {