This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 1caa406bbf NO-JIRA Adding a test to verify TX in OpenWire Large 
Message Handling
1caa406bbf is described below

commit 1caa406bbf2b2492bf7d0789f8a54ea570a0251c
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Feb 8 09:27:29 2023 -0500

    NO-JIRA Adding a test to verify TX in OpenWire Large Message Handling
---
 .../openwire/OpenWireLargeMessageTest.java         | 83 ++++++++++++++++++++++
 1 file changed, 83 insertions(+)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
index 80aecbbe7f..02e346ad13 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
@@ -25,7 +25,13 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -40,9 +46,13 @@ import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class OpenWireLargeMessageTest extends BasicOpenWireTest {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    public OpenWireLargeMessageTest() {
       super();
    }
@@ -178,6 +188,79 @@ public class OpenWireLargeMessageTest extends 
BasicOpenWireTest {
    }
 
 
+   @Test
+   public void testSendReceiveLargeMessageTX() throws Exception {
+      int NUMBER_OF_MESSAGES = 400;
+      int TX_SIZE = 100;
+
+      ExecutorService executorService = Executors.newFixedThreadPool(1);
+      runAfter(executorService::shutdownNow);
+
+      AtomicInteger errors = new AtomicInteger(0);
+      CountDownLatch latch = new CountDownLatch(1);
+
+      // Create 1MB Message
+      String largeString;
+
+      {
+         String randomString = "This is a random String " + 
RandomUtil.randomString();
+         StringBuffer largeBuffer = new StringBuffer();
+         while (largeBuffer.length() < 1024 * 1024) {
+            largeBuffer.append(randomString);
+         }
+
+         largeString = largeBuffer.toString();
+      }
+
+      executorService.execute(() -> {
+
+         try (Connection connection = factory.createConnection()) {
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(lmAddress.toString());
+            MessageConsumer consumer = session.createConsumer(queue);
+            for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
+               TextMessage m = (TextMessage) consumer.receive(5000);
+               assertEquals(largeString, m.getText());
+               if (received > 0 && received % TX_SIZE == 0) {
+                  logger.info("Received {} messages", received);
+                  session.commit();
+               }
+            }
+            session.commit();
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+            errors.incrementAndGet();
+         } finally {
+            latch.countDown();
+         }
+
+      });
+
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(lmAddress.toString());
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
+            TextMessage message = session.createTextMessage(largeString);
+            producer.send(message);
+            if (sent > 0 && sent % TX_SIZE == 0) {
+               logger.info("Sent {} messages", sent);
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+
+      latch.await(1, TimeUnit.MINUTES);
+      Assert.assertEquals(0, errors.get());
+   }
+
 
    @Override
    protected void extraServerConfig(Configuration serverConfig) {

Reply via email to