This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 3d671c40aa82fffbcd222f0ccf4daf0647c76d39 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Mon May 5 12:02:17 2025 -0400 ARTEMIS-5464 Simplifying LargeMessageInterruptTest This test was also failing intermittently. This will also take care of fixing the test. --- .../main/resources/servers/interruptlm/broker.xml | 10 +- .../interruptlm/LargeMessageInterruptTest.java | 196 +++++++++------------ 2 files changed, 88 insertions(+), 118 deletions(-) diff --git a/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml b/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml index 30f2d86ecb..fa492d9358 100644 --- a/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml +++ b/tests/soak-tests/src/main/resources/servers/interruptlm/broker.xml @@ -200,7 +200,7 @@ under the License. <max-size-bytes>-1</max-size-bytes> <!-- limit for the address in messages, -1 means unlimited --> - <max-size-messages>300</max-size-messages> + <max-size-messages>-1</max-size-messages> <!-- the size of each file on paging. Notice we keep files in memory while they are in use. Lower this setting if you have too many queues in memory. --> @@ -219,6 +219,9 @@ under the License. <auto-delete-queues>false</auto-delete-queues> <auto-delete-addresses>false</auto-delete-addresses> </address-setting> + <address-setting match="LargeMessageInterruptTestPaged"> + <max-size-messages>1</max-size-messages> + </address-setting> </address-settings> <addresses> @@ -237,6 +240,11 @@ under the License. <queue name="LargeMessageInterruptTest" /> </anycast> </address> + <address name="LargeMessageInterruptTestPaged"> + <anycast> + <queue name="LargeMessageInterruptTestPaged" /> + </anycast> + </address> </addresses> diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java index 326f9ba0ec..c6f98e1e88 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; @@ -56,7 +57,6 @@ public class LargeMessageInterruptTest extends SoakTestBase { public static final String SERVER_NAME_0 = "interruptlm"; - @BeforeAll public static void createServers() throws Exception { { @@ -64,14 +64,13 @@ public class LargeMessageInterruptTest extends SoakTestBase { deleteDirectory(serverLocation); HelperCreate cliCreateServer = helperCreate(); - cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation). + cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setArtemisInstance(serverLocation). setConfiguration("./src/main/resources/servers/interruptlm"); cliCreateServer.setArgs("--java-options", "-Djava.rmi.server.hostname=localhost", "--clustered", "--static-cluster", "tcp://localhost:61716", "--queues", "ClusteredLargeMessageInterruptTest", "--name", "lmbroker1"); cliCreateServer.createServer(); } } - private static final String JMX_SERVER_HOSTNAME = "localhost"; private static final int JMX_SERVER_PORT_0 = 1099; private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -79,10 +78,6 @@ public class LargeMessageInterruptTest extends SoakTestBase { static ObjectNameBuilder nameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "lminterrupt", true); Process serverProcess; - public ConnectionFactory createConnectionFactory(String protocol) { - return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); - } - @BeforeEach public void before() throws Exception { cleanupData(SERVER_NAME_0); @@ -90,85 +85,38 @@ public class LargeMessageInterruptTest extends SoakTestBase { disableCheckThread(); } - @Test - public void testInterruptLargeMessageAMQPTX() throws Throwable { - testInterruptLM("AMQP", true, false); - } - - @Test - public void testInterruptLargeMessageAMQPTXPaging() throws Throwable { - testInterruptLM("AMQP", true, true); - } - - @Test - public void testInterruptLargeMessageCORETX() throws Throwable { - testInterruptLM("CORE", true, false); - } - - @Test - public void testInterruptLargeMessageCORETXPaging() throws Throwable { - testInterruptLM("CORE", true, true); - } - - - @Test - public void testInterruptLargeMessageOPENWIRETX() throws Throwable { - testInterruptLM("OPENWIRE", true, false); - } - - @Test - public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable { - testInterruptLM("OPENWIRE", true, true); - } - - - @Test - public void testInterruptLargeMessageAMQPNonTX() throws Throwable { - testInterruptLM("AMQP", false, false); - } - - @Test - public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable { - testInterruptLM("AMQP", false, true); - } - - @Test - public void testInterruptLargeMessageCORENonTX() throws Throwable { - testInterruptLM("CORE", false, false); - } - - @Test - public void testInterruptLargeMessageCORENonTXPaging() throws Throwable { - testInterruptLM("CORE", false, true); - } - private void killProcess(Process process) throws Exception { Runtime.getRuntime().exec("kill -SIGINT " + process.pid()); } - - private void testInterruptLM(String protocol, boolean tx, boolean paging) throws Throwable { + @Test + public void testInterrupt() throws Throwable { final int BODY_SIZE = 500 * 1024; final int NUMBER_OF_MESSAGES = 10; // this is per producer - final int SENDING_THREADS = 10; + + // SENDING_THREADS must have the number of tasks submitted on executorService as we will use a CyclicBarrier to align + // start and other controls. If you change the logic on the loop please update this number here. + final int SENDING_THREADS = 12; + CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS); final CountDownLatch done = new CountDownLatch(SENDING_THREADS); final AtomicInteger produced = new AtomicInteger(0); - final ConnectionFactory factory = createConnectionFactory(protocol); final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server - final CountDownLatch killAt = new CountDownLatch(40); + final CountDownLatch killAt = new CountDownLatch(10); ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS); runAfter(executorService::shutdownNow); String queueName = "LargeMessageInterruptTest"; + String pagedQueueName = "LargeMessageInterruptTestPaged"; String largebody = RandomUtil.randomAlphaNumericString(BODY_SIZE); - if (paging) { + { + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); try (Connection connection = factory.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = session.createProducer(session.createQueue(queueName)); + MessageProducer producer = session.createProducer(session.createQueue(pagedQueueName)); for (int i = 0; i < 1000; i++) { producer.send(session.createTextMessage("forcePage")); } @@ -176,60 +124,67 @@ public class LargeMessageInterruptTest extends SoakTestBase { } } - for (int i = 0; i < SENDING_THREADS; i++) { - executorService.execute(() -> { - int numberOfMessages = 0; - try { - Connection connection = factory.createConnection(); - Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(queueName)); + String[] protocols = new String[]{"CORE", "AMQP", "OPENWIRE"}; + String[] destinations = new String[] {queueName, pagedQueueName}; - startFlag.await(10, TimeUnit.SECONDS); - while (numberOfMessages < NUMBER_OF_MESSAGES) { + for (String queueUsed : destinations) { + for (String protocolUsed : protocols) { + for (int i = 0; i <= 1; i++) { + boolean tx = i > 0; + logger.info("sending protocol {}, destination {}, tx={}", protocolUsed, queueUsed, tx); + executorService.execute(() -> { + int numberOfMessages = 0; try { - producer.send(session.createTextMessage(largebody)); - if (tx) { - session.commit(); - } - produced.incrementAndGet(); - killAt.countDown(); - if (numberOfMessages++ % 10 == 0) { - logger.info("Sent {}", numberOfMessages); - } - } catch (Exception e) { - logger.warn(e.getMessage(), e); - - logger.warn(e.getMessage(), e); - try { - connection.close(); - } catch (Throwable ignored) { - } + final ConnectionFactory factory = CFUtil.createConnectionFactory(protocolUsed, "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(queueUsed)); - for (int retryNumber = 0; retryNumber < 100; retryNumber++) { + startFlag.await(10, TimeUnit.SECONDS); + while (numberOfMessages < NUMBER_OF_MESSAGES) { try { - Connection ctest = factory.createConnection(); - ctest.close(); - break; - } catch (Throwable retry) { - Thread.sleep(100); + producer.send(session.createTextMessage(largebody)); + if (tx) { + session.commit(); + } + produced.incrementAndGet(); + killAt.countDown(); + if (numberOfMessages++ % 10 == 0) { + logger.info("Sent {}", numberOfMessages); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + try { + connection.close(); + } catch (Throwable ignored) { + } + + for (int retryNumber = 0; retryNumber < 100; retryNumber++) { + try { + connection = factory.createConnection(); + } catch (Throwable retry) { + connection = null; + Thread.sleep(500); + } + } + + assertNotNull(connection, "retry did not work on createConnection"); + session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(session.createQueue(queueName)); + connection.start(); + } } - - connection = factory.createConnection(); - session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(session.createQueue(queueName)); - connection.start(); - + } catch (Throwable e) { + logger.warn("Error while sending protocol {}, destination {}, tx={}", protocolUsed, queueUsed, tx, e); + errors.incrementAndGet(); } - } - } catch (Exception e) { - logger.warn("Error getting the initial connection", e); - errors.incrementAndGet(); - } - logger.info("Done sending"); - done.countDown(); - }); + logger.info("Done sending"); + done.countDown(); + }); + } + } } assertTrue(killAt.await(60, TimeUnit.SECONDS)); @@ -240,11 +195,23 @@ public class LargeMessageInterruptTest extends SoakTestBase { assertTrue(done.await(60, TimeUnit.SECONDS)); assertEquals(0, errors.get()); + ServerUtil.waitForServerToStart(0, 60_000); + + verifyQueue(queueName, largebody); + verifyQueue(pagedQueueName, largebody); + + File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); + assertTrue(lmFolder.exists()); + Wait.assertEquals(0, () -> lmFolder.listFiles().length); + } + + private static void verifyQueue(String queueName, String largebody) throws Throwable { QueueControl queueControl = getQueueControl(liveURI, nameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000); long numberOfMessages = queueControl.getMessageCount(); logger.info("there are {} messages", numberOfMessages); + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); try (Connection connection = factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); @@ -255,11 +222,6 @@ public class LargeMessageInterruptTest extends SoakTestBase { assertTrue(message.getText().equals("forcePage") || message.getText().equals(largebody)); } } - - File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages"); - assertTrue(lmFolder.exists()); - Wait.assertEquals(0, () -> lmFolder.listFiles().length); - } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org For additional commands, e-mail: commits-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact