This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit b41556d8ef08d3d16225c4a5405fdc7c1886daaf Author: Clebert Suconic <[email protected]> AuthorDate: Mon Apr 7 17:06:54 2025 -0400 ARTEMIS-5387 optimize HorizontalPagingTest --- .../tests/soak/paging/HorizontalPagingTest.java | 174 +++++++++------------ .../src/test/scripts/longrun-parameters.sh | 26 +-- tests/soak-tests/src/test/scripts/parameters.sh | 26 +-- 3 files changed, 85 insertions(+), 141 deletions(-) diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java index 2015da2d11..1e4081fc87 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java @@ -38,8 +38,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; -import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.RandomUtil; @@ -48,24 +46,21 @@ import org.apache.activemq.artemis.utils.TestParameters; import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.lang.invoke.MethodHandles; /** * Refer to ./scripts/parameters.sh for suggested parameters #You may choose to use zip files to save some time on * producing if you want to run this test over and over when debugging export TEST_HORIZONTAL_ZIP_LOCATION=a folder */ -@ExtendWith(ParameterizedTestExtension.class) public class HorizontalPagingTest extends SoakTestBase { private static final String TEST_NAME = "HORIZONTAL"; - private final String protocol; private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true")); - private static final String ZIP_LOCATION = testProperty(null, "ZIP_LOCATION", null); private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 300_000); private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 120); private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "OPENWIRE,CORE,AMQP"); @@ -79,7 +74,6 @@ public class HorizontalPagingTest extends SoakTestBase { private final int MESSAGE_SIZE; private final int PARALLEL_SENDS; - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String SERVER_NAME_0 = "horizontalPaging"; @@ -99,77 +93,61 @@ public class HorizontalPagingTest extends SoakTestBase { } } - @Parameters(name = "protocol={0}") - public static Collection<Object[]> parameters() { + public static List<String> parseProtocolList() { String[] protocols = PROTOCOL_LIST.split(","); - List<Object[]> parameters = new ArrayList<>(); + List<String> protocolList = new ArrayList<>(); for (String str : protocols) { logger.info("Adding {} to the list for the test", str); - parameters.add(new Object[]{str}); + protocolList.add(str); } - return parameters; + return protocolList; } - public HorizontalPagingTest(String protocol) { - this.protocol = protocol; - DESTINATIONS = TestParameters.testProperty(TEST_NAME, protocol + "_DESTINATIONS", 5); - MESSAGES = TestParameters.testProperty(TEST_NAME, protocol + "_MESSAGES", 1000); - COMMIT_INTERVAL = TestParameters.testProperty(TEST_NAME, protocol + "_COMMIT_INTERVAL", 100); + public HorizontalPagingTest() { + DESTINATIONS = TestParameters.testProperty(TEST_NAME, "DESTINATIONS", 5); + MESSAGES = TestParameters.testProperty(TEST_NAME, "MESSAGES", 1000); + COMMIT_INTERVAL = TestParameters.testProperty(TEST_NAME, "COMMIT_INTERVAL", 100); // if 0 will use AUTO_ACK - RECEIVE_COMMIT_INTERVAL = TestParameters.testProperty(TEST_NAME, protocol + "_RECEIVE_COMMIT_INTERVAL", 100); - MESSAGE_SIZE = TestParameters.testProperty(TEST_NAME, protocol + "_MESSAGE_SIZE", 10_000); - PARALLEL_SENDS = TestParameters.testProperty(TEST_NAME, protocol + "_PARALLEL_SENDS", 5); + RECEIVE_COMMIT_INTERVAL = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT_INTERVAL", 100); + MESSAGE_SIZE = TestParameters.testProperty(TEST_NAME, "MESSAGE_SIZE", 10_000); + PARALLEL_SENDS = TestParameters.testProperty(TEST_NAME, "PARALLEL_SENDS", 5); } Process serverProcess; - boolean unzipped = false; - - private String getZipName() { - return "horizontal-" + protocol + "-" + DESTINATIONS + "-" + MESSAGES + "-" + MESSAGE_SIZE + ".zip"; - } - @BeforeEach public void before() throws Exception { assumeTrue(TEST_ENABLED); cleanupData(SERVER_NAME_0); - boolean useZip = ZIP_LOCATION != null; - String zipName = getZipName(); - File zipFile = useZip ? new File(ZIP_LOCATION + "/" + zipName) : null; - - if (ZIP_LOCATION != null && zipFile.exists()) { - unzipped = true; - unzip(zipFile, new File(getServerLocation(SERVER_NAME_0))); - } - serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT); } - - @TestTemplate + @Test public void testHorizontal() throws Exception { - ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Collection<String> protocolList = parseProtocolList(); AtomicInteger errors = new AtomicInteger(0); - ExecutorService service = Executors.newFixedThreadPool(DESTINATIONS); + ExecutorService service = Executors.newFixedThreadPool(DESTINATIONS * protocolList.size()); runAfter(service::shutdownNow); - if (!unzipped) { - Connection connection = factory.createConnection(); - runAfter(connection::close); + String text = RandomUtil.randomAlphaNumericString(MESSAGE_SIZE); - String text = RandomUtil.randomAlphaNumericString(MESSAGE_SIZE); + ReusableLatch latchDone = new ReusableLatch(0); - ReusableLatch latchDone = new ReusableLatch(0); + for (String protocol : protocolList) { + String protocolUsed = protocol; + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + runAfter(connection::close); for (int i = 0; i < DESTINATIONS; i++) { latchDone.countUp(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("queue_" + i); + Queue queue = session.createQueue("queue_" + i + protocolUsed); service.execute(() -> { try { logger.info("*******************************************************************************************************************************\ndestination {}", queue.getQueueName()); @@ -193,87 +171,81 @@ public class HorizontalPagingTest extends SoakTestBase { latchDone.countDown(); } }); - - if ((i + 1) % PARALLEL_SENDS == 0) { - latchDone.await(); - } } - latchDone.await(); - - connection.close(); - - - killServer(serverProcess); } + assertTrue(latchDone.await(TIMEOUT_MINUTES, TimeUnit.MINUTES)); - - if (ZIP_LOCATION != null && !unzipped) { - String fileName = getZipName(); - zip(new File(ZIP_LOCATION, fileName), new File(getServerLocation(SERVER_NAME_0))); - } + killServer(serverProcess); serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT); - Connection connectionConsumer = factory.createConnection(); - - runAfter(connectionConsumer::close); - AtomicInteger completedFine = new AtomicInteger(0); - for (int i = 0; i < DESTINATIONS; i++) { - int destination = i; - service.execute(() -> { - try { - Session sessionConsumer; + for (String protocol : protocolList) { + latchDone.countUp(); + String protocolUsed = protocol; - if (RECEIVE_COMMIT_INTERVAL <= 0) { - sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); - } else { - sessionConsumer = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED); - } + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connectionConsumer = factory.createConnection(); + runAfter(connectionConsumer::close); - MessageConsumer messageConsumer = sessionConsumer.createConsumer(sessionConsumer.createQueue("queue_" + destination)); - for (int m = 0; m < MESSAGES; m++) { - TextMessage message = (TextMessage) messageConsumer.receive(50_000); - if (message == null) { - m--; - continue; - } + for (int i = 0; i < DESTINATIONS; i++) { + int destination = i; + service.execute(() -> { + try { + Session sessionConsumer; - // The sending commit interval here will be used for printing - if (PRINT_INTERVAL > 0 && m % PRINT_INTERVAL == 0) { - logger.info("Destination {} received {} {} messages", destination, m, protocol); + if (RECEIVE_COMMIT_INTERVAL <= 0) { + sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); + } else { + sessionConsumer = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED); } - assertEquals(m, message.getIntProperty("m")); + MessageConsumer messageConsumer = sessionConsumer.createConsumer(sessionConsumer.createQueue("queue_" + destination + protocolUsed)); + for (int m = 0; m < MESSAGES; m++) { + TextMessage message = (TextMessage) messageConsumer.receive(50_000); + if (message == null) { + m--; + continue; + } + + // The sending commit interval here will be used for printing + if (PRINT_INTERVAL > 0 && m % PRINT_INTERVAL == 0) { + logger.info("Destination {} received {} {} messages", destination, m, protocol); + } + + assertEquals(m, message.getIntProperty("m")); + + if (RECEIVE_COMMIT_INTERVAL > 0 && (m + 1) % RECEIVE_COMMIT_INTERVAL == 0) { + sessionConsumer.commit(); + } + } - if (RECEIVE_COMMIT_INTERVAL > 0 && (m + 1) % RECEIVE_COMMIT_INTERVAL == 0) { + if (RECEIVE_COMMIT_INTERVAL > 0) { sessionConsumer.commit(); } - } - if (RECEIVE_COMMIT_INTERVAL > 0) { - sessionConsumer.commit(); - } + completedFine.incrementAndGet(); - completedFine.incrementAndGet(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latchDone.countDown(); + } + }); + } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - errors.incrementAndGet(); - } - }); + connectionConsumer.start(); } - connectionConsumer.start(); + assertTrue(latchDone.await(TIMEOUT_MINUTES, TimeUnit.MINUTES)); service.shutdown(); assertTrue(service.awaitTermination(TIMEOUT_MINUTES, TimeUnit.MINUTES), "Test Timed Out"); assertEquals(0, errors.get()); - assertEquals(DESTINATIONS, completedFine.get()); - - connectionConsumer.close(); + assertEquals(DESTINATIONS * protocolList.size(), completedFine.get()); } } diff --git a/tests/soak-tests/src/test/scripts/longrun-parameters.sh b/tests/soak-tests/src/test/scripts/longrun-parameters.sh index 5ab2705f9a..c8b9aea785 100755 --- a/tests/soak-tests/src/test/scripts/longrun-parameters.sh +++ b/tests/soak-tests/src/test/scripts/longrun-parameters.sh @@ -30,26 +30,12 @@ export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000 export TEST_HORIZONTAL_TIMEOUT_MINUTES=120 export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP -export TEST_HORIZONTAL_CORE_DESTINATIONS=20 -export TEST_HORIZONTAL_CORE_MESSAGES=5000 -export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=1000 -export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0 -export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000 -export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=20 - -export TEST_HORIZONTAL_AMQP_DESTINATIONS=20 -export TEST_HORIZONTAL_AMQP_MESSAGES=1000 -export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100 -export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0 -export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000 -export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10 - -export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=20 -export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000 -export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100 -export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0 -export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000 -export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10 +export TEST_HORIZONTAL_DESTINATIONS=20 +export TEST_HORIZONTAL_MESSAGES=5000 +export TEST_HORIZONTAL_COMMIT_INTERVAL=1000 +export TEST_HORIZONTAL_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_PARALLEL_SENDS=20 export TEST_FLOW_SERVER_START_TIMEOUT=300000 export TEST_FLOW_TIMEOUT_MINUTES=120 diff --git a/tests/soak-tests/src/test/scripts/parameters.sh b/tests/soak-tests/src/test/scripts/parameters.sh index f53e46b6e3..bc1a6afa8c 100755 --- a/tests/soak-tests/src/test/scripts/parameters.sh +++ b/tests/soak-tests/src/test/scripts/parameters.sh @@ -29,26 +29,12 @@ export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000 export TEST_HORIZONTAL_TIMEOUT_MINUTES=120 export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP -export TEST_HORIZONTAL_CORE_DESTINATIONS=20 -export TEST_HORIZONTAL_CORE_MESSAGES=5000 -export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=1000 -export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0 -export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000 -export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=20 - -export TEST_HORIZONTAL_AMQP_DESTINATIONS=20 -export TEST_HORIZONTAL_AMQP_MESSAGES=1000 -export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100 -export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0 -export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000 -export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10 - -export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=20 -export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000 -export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100 -export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0 -export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000 -export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10 +export TEST_HORIZONTAL_DESTINATIONS=20 +export TEST_HORIZONTAL_MESSAGES=5000 +export TEST_HORIZONTAL_COMMIT_INTERVAL=1000 +export TEST_HORIZONTAL_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_PARALLEL_SENDS=20 export TEST_FLOW_SERVER_START_TIMEOUT=300000 export TEST_FLOW_TIMEOUT_MINUTES=120 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
