This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 3d671c40aa82fffbcd222f0ccf4daf0647c76d39
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Mon May 5 12:02:17 2025 -0400

    ARTEMIS-5464 Simplifying LargeMessageInterruptTest
    
    This test was also failing intermittently. This will also take
    care of fixing the test.
---
 .../main/resources/servers/interruptlm/broker.xml  |  10 +-
 .../interruptlm/LargeMessageInterruptTest.java     | 196 +++++++++------------
 2 files changed, 88 insertions(+), 118 deletions(-)

diff --git a/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml 
b/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml
index 30f2d86ecb..fa492d9358 100644
--- a/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml
+++ b/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml
@@ -200,7 +200,7 @@ under the License.
             <max-size-bytes>-1</max-size-bytes>
 
             <!-- limit for the address in messages, -1 means unlimited -->
-            <max-size-messages>300</max-size-messages>
+            <max-size-messages>-1</max-size-messages>
 
             <!-- the size of each file on paging. Notice we keep files in 
memory while they are in use.
                  Lower this setting if you have too many queues in memory. -->
@@ -219,6 +219,9 @@ under the License.
             <auto-delete-queues>false</auto-delete-queues>
             <auto-delete-addresses>false</auto-delete-addresses>
          </address-setting>
+         <address-setting match="LargeMessageInterruptTestPaged">
+            <max-size-messages>1</max-size-messages>
+         </address-setting>
       </address-settings>
 
       <addresses>
@@ -237,6 +240,11 @@ under the License.
                <queue name="LargeMessageInterruptTest" />
             </anycast>
          </address>
+         <address name="LargeMessageInterruptTestPaged">
+            <anycast>
+               <queue name="LargeMessageInterruptTestPaged" />
+            </anycast>
+         </address>
 
       </addresses>
 
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
index 326f9ba0ec..c6f98e1e88 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
@@ -42,6 +42,7 @@ import 
org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
@@ -56,7 +57,6 @@ public class LargeMessageInterruptTest extends SoakTestBase {
 
    public static final String SERVER_NAME_0 = "interruptlm";
 
-
    @BeforeAll
    public static void createServers() throws Exception {
       {
@@ -64,14 +64,13 @@ public class LargeMessageInterruptTest extends SoakTestBase 
{
          deleteDirectory(serverLocation);
 
          HelperCreate cliCreateServer = helperCreate();
-         
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation).
+         
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setArtemisInstance(serverLocation).
             setConfiguration("./src/main/resources/servers/interruptlm");
          cliCreateServer.setArgs("--java-options", 
"-Djava.rmi.server.hostname=localhost", "--clustered", "--static-cluster", 
"tcp://localhost:61716", "--queues", "ClusteredLargeMessageInterruptTest", 
"--name", "lmbroker1");
          cliCreateServer.createServer();
       }
    }
 
-
    private static final String JMX_SERVER_HOSTNAME = "localhost";
    private static final int JMX_SERVER_PORT_0 = 1099;
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -79,10 +78,6 @@ public class LargeMessageInterruptTest extends SoakTestBase {
    static ObjectNameBuilder nameBuilder = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lminterrupt", true);
    Process serverProcess;
 
-   public ConnectionFactory createConnectionFactory(String protocol) {
-      return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
-   }
-
    @BeforeEach
    public void before() throws Exception {
       cleanupData(SERVER_NAME_0);
@@ -90,85 +85,38 @@ public class LargeMessageInterruptTest extends SoakTestBase 
{
       disableCheckThread();
    }
 
-   @Test
-   public void testInterruptLargeMessageAMQPTX() throws Throwable {
-      testInterruptLM("AMQP", true, false);
-   }
-
-   @Test
-   public void testInterruptLargeMessageAMQPTXPaging() throws Throwable {
-      testInterruptLM("AMQP", true, true);
-   }
-
-   @Test
-   public void testInterruptLargeMessageCORETX() throws Throwable {
-      testInterruptLM("CORE", true, false);
-   }
-
-   @Test
-   public void testInterruptLargeMessageCORETXPaging() throws Throwable {
-      testInterruptLM("CORE", true, true);
-   }
-
-
-   @Test
-   public void testInterruptLargeMessageOPENWIRETX() throws Throwable {
-      testInterruptLM("OPENWIRE", true, false);
-   }
-
-   @Test
-   public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable {
-      testInterruptLM("OPENWIRE", true, true);
-   }
-
-
-   @Test
-   public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
-      testInterruptLM("AMQP", false, false);
-   }
-
-   @Test
-   public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable {
-      testInterruptLM("AMQP", false, true);
-   }
-
-   @Test
-   public void testInterruptLargeMessageCORENonTX() throws Throwable {
-      testInterruptLM("CORE", false, false);
-   }
-
-   @Test
-   public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
-      testInterruptLM("CORE", false, true);
-   }
-
    private void killProcess(Process process) throws Exception {
       Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
    }
 
-
-   private void testInterruptLM(String protocol, boolean tx, boolean paging) 
throws Throwable {
+   @Test
+   public void testInterrupt() throws Throwable {
       final int BODY_SIZE = 500 * 1024;
       final int NUMBER_OF_MESSAGES = 10; // this is per producer
-      final int SENDING_THREADS = 10;
+
+      // SENDING_THREADS must have the number of tasks submitted on 
executorService as we will use a CyclicBarrier to align
+      // start and other controls. If you change the logic on the loop please 
update this number here.
+      final int SENDING_THREADS = 12;
+
       CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
       final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
       final AtomicInteger produced = new AtomicInteger(0);
-      final ConnectionFactory factory = createConnectionFactory(protocol);
       final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
-      final CountDownLatch killAt = new CountDownLatch(40);
+      final CountDownLatch killAt = new CountDownLatch(10);
 
       ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS);
       runAfter(executorService::shutdownNow);
 
       String queueName = "LargeMessageInterruptTest";
+      String pagedQueueName = "LargeMessageInterruptTestPaged";
 
       String largebody = RandomUtil.randomAlphaNumericString(BODY_SIZE);
 
-      if (paging) {
+      {
+         ConnectionFactory factory = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
          try (Connection connection = factory.createConnection()) {
             Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+            MessageProducer producer = 
session.createProducer(session.createQueue(pagedQueueName));
             for (int i = 0; i < 1000; i++) {
                producer.send(session.createTextMessage("forcePage"));
             }
@@ -176,60 +124,67 @@ public class LargeMessageInterruptTest extends 
SoakTestBase {
          }
       }
 
-      for (int i = 0; i < SENDING_THREADS; i++) {
-         executorService.execute(() -> {
-            int numberOfMessages = 0;
-            try {
-               Connection connection = factory.createConnection();
-               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+      String[] protocols = new String[]{"CORE", "AMQP", "OPENWIRE"};
+      String[] destinations = new String[] {queueName, pagedQueueName};
 
-               startFlag.await(10, TimeUnit.SECONDS);
-               while (numberOfMessages < NUMBER_OF_MESSAGES) {
+      for (String queueUsed : destinations) {
+         for (String protocolUsed : protocols) {
+            for (int i = 0; i <= 1; i++) {
+               boolean tx = i > 0;
+               logger.info("sending protocol {}, destination {}, tx={}", 
protocolUsed, queueUsed, tx);
+               executorService.execute(() -> {
+                  int numberOfMessages = 0;
                   try {
-                     producer.send(session.createTextMessage(largebody));
-                     if (tx) {
-                        session.commit();
-                     }
-                     produced.incrementAndGet();
-                     killAt.countDown();
-                     if (numberOfMessages++ % 10 == 0) {
-                        logger.info("Sent {}", numberOfMessages);
-                     }
-                  } catch (Exception e) {
-                     logger.warn(e.getMessage(), e);
-
-                     logger.warn(e.getMessage(), e);
-                     try {
-                        connection.close();
-                     } catch (Throwable ignored) {
-                     }
+                     final ConnectionFactory factory = 
CFUtil.createConnectionFactory(protocolUsed, "tcp://localhost:61616");
+                     Connection connection = factory.createConnection();
+                     Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                     MessageProducer producer = 
session.createProducer(session.createQueue(queueUsed));
 
-                     for (int retryNumber = 0; retryNumber < 100; 
retryNumber++) {
+                     startFlag.await(10, TimeUnit.SECONDS);
+                     while (numberOfMessages < NUMBER_OF_MESSAGES) {
                         try {
-                           Connection ctest = factory.createConnection();
-                           ctest.close();
-                           break;
-                        } catch (Throwable retry) {
-                           Thread.sleep(100);
+                           producer.send(session.createTextMessage(largebody));
+                           if (tx) {
+                              session.commit();
+                           }
+                           produced.incrementAndGet();
+                           killAt.countDown();
+                           if (numberOfMessages++ % 10 == 0) {
+                              logger.info("Sent {}", numberOfMessages);
+                           }
+                        } catch (Exception e) {
+                           logger.warn(e.getMessage(), e);
+                           try {
+                              connection.close();
+                           } catch (Throwable ignored) {
+                           }
+
+                           for (int retryNumber = 0; retryNumber < 100; 
retryNumber++) {
+                              try {
+                                 connection = factory.createConnection();
+                              } catch (Throwable retry) {
+                                 connection = null;
+                                 Thread.sleep(500);
+                              }
+                           }
+
+                           assertNotNull(connection, "retry did not work on 
createConnection");
+                           session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                           producer = 
session.createProducer(session.createQueue(queueName));
+                           connection.start();
+
                         }
                      }
-
-                     connection = factory.createConnection();
-                     session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-                     producer = 
session.createProducer(session.createQueue(queueName));
-                     connection.start();
-
+                  } catch (Throwable e) {
+                     logger.warn("Error while sending protocol {}, destination 
{}, tx={}", protocolUsed, queueUsed, tx, e);
+                     errors.incrementAndGet();
                   }
-               }
-            } catch (Exception e) {
-               logger.warn("Error getting the initial connection", e);
-               errors.incrementAndGet();
-            }
 
-            logger.info("Done sending");
-            done.countDown();
-         });
+                  logger.info("Done sending");
+                  done.countDown();
+               });
+            }
+         }
       }
 
       assertTrue(killAt.await(60, TimeUnit.SECONDS));
@@ -240,11 +195,23 @@ public class LargeMessageInterruptTest extends 
SoakTestBase {
       assertTrue(done.await(60, TimeUnit.SECONDS));
       assertEquals(0, errors.get());
 
+      ServerUtil.waitForServerToStart(0, 60_000);
+
+      verifyQueue(queueName, largebody);
+      verifyQueue(pagedQueueName, largebody);
+
+      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
+      assertTrue(lmFolder.exists());
+      Wait.assertEquals(0, () -> lmFolder.listFiles().length);
+   }
+
+   private static void verifyQueue(String queueName, String largebody) throws 
Throwable {
       QueueControl queueControl = getQueueControl(liveURI, nameBuilder, 
queueName, queueName, RoutingType.ANYCAST, 5000);
 
       long numberOfMessages = queueControl.getMessageCount();
       logger.info("there are {} messages", numberOfMessages);
 
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
          MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
@@ -255,11 +222,6 @@ public class LargeMessageInterruptTest extends 
SoakTestBase {
             assertTrue(message.getText().equals("forcePage") || 
message.getText().equals(largebody));
          }
       }
-
-      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
-      assertTrue(lmFolder.exists());
-      Wait.assertEquals(0, () -> lmFolder.listFiles().length);
-
    }
 
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to