This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 02ad299e12f379cb053db8a6ab18d7db0ebfa54d Author: Clebert Suconic <[email protected]> AuthorDate: Fri Jan 10 17:10:28 2025 -0500 ARTEMIS-5232 Running all possible permutations with a single server execution on TransferTest --- .../artemis/cli/commands/messages/Transfer.java | 4 + .../artemis/tests/smoke/transfer/TransferTest.java | 202 +++++++++------------ 2 files changed, 93 insertions(+), 113 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java index f548224b13..5fb376c961 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java @@ -337,6 +337,10 @@ public class Transfer extends InputAbstract { context.out.println("Connection brokerURL = " + sourceURL); + return doTransfer(context); + } + + private int doTransfer(ActionContext context) throws Exception { ConnectionFactory sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID); Connection sourceConnection = sourceConnectionFactory.createConnection(); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/transfer/TransferTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/transfer/TransferTest.java index ddb33dc7ae..b918c24b83 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/transfer/TransferTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/transfer/TransferTest.java @@ -30,66 +30,30 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import java.io.File; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; -import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; -import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.Run; +import org.apache.activemq.artemis.cli.commands.messages.Transfer; import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; -import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@ExtendWith(ParameterizedTestExtension.class) public class TransferTest extends SmokeTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String SERVER_NAME_0 = "transfer1"; public static final String SERVER_NAME_1 = "transfer2"; private static final int NUMBER_OF_MESSAGES = 200; private static final int PARTIAL_MESSAGES = 10; - String sourceTransferProtocol = "amqp"; - String targetTransferProtocol = "amqp"; - String senderProtocol = "amqp"; - String consumerProtocol = "amqp"; - - /* - <execution> - <phase>test-compile</phase> - <id>create-transfer-1</id> - <goals> - <goal>create</goal> - </goals> - <configuration> - <!-- this makes it easier in certain envs --> - <allowAnonymous>true</allowAnonymous> - <user>admin</user> - <password>admin</password> - <noWeb>true</noWeb> - <instance>${basedir}/target/transfer1</instance> - </configuration> - </execution> - <execution> - <phase>test-compile</phase> - <id>create-transfer-2</id> - <goals> - <goal>create</goal> - </goals> - <configuration> - <!-- this makes it easier in certain envs --> - <allowAnonymous>true</allowAnonymous> - <user>admin</user> - <password>admin</password> - <noWeb>true</noWeb> - <portOffset>100</portOffset> - <instance>${basedir}/target/transfer2</instance> - </configuration> - </execution> - - */ @BeforeAll @@ -104,30 +68,26 @@ public class TransferTest extends SmokeTestBase { { HelperCreate cliCreateServer = helperCreate(); cliCreateServer.setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(server0Location); + cliCreateServer.addArgs("--disable-persistence"); cliCreateServer.createServer(); } { HelperCreate cliCreateServer = helperCreate(); cliCreateServer.setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(server1Location).setPortOffset(100); + cliCreateServer.addArgs("--disable-persistence"); cliCreateServer.createServer(); } } - - public TransferTest(String sender, String consumer, String source, String target) { - this.senderProtocol = sender; - this.consumerProtocol = consumer; - this.sourceTransferProtocol = source; - this.targetTransferProtocol = target; + public TransferTest() { } - @Parameters(name = "sender={0}, consumer={1}, sourceOnTransfer={2}, targetOnTransfer={3}") - public static Collection<Object[]> getParams() { + public static Collection<String[]> mixProtocolOptions() { String[] protocols = new String[]{"core", "amqp"}; - ArrayList<Object[]> parameters = new ArrayList<>(); + ArrayList<String[]> parameters = new ArrayList<>(); for (int i = 0; i < protocols.length; i++) { for (int j = 0; j < protocols.length; j++) { @@ -142,11 +102,11 @@ public class TransferTest extends SmokeTestBase { return parameters; } - private ConnectionFactory createConsumerCF() { + private ConnectionFactory createConsumerCF(String consumerProtocol) { return CFUtil.createConnectionFactory(consumerProtocol, "tcp://localhost:61716"); } - private ConnectionFactory createSenderCF() { + private ConnectionFactory createSenderCF(String senderProtocol) { return CFUtil.createConnectionFactory(senderProtocol, "tcp://localhost:61616"); } @@ -159,30 +119,66 @@ public class TransferTest extends SmokeTestBase { startServer(SERVER_NAME_1, 100, 30000); } - @TestTemplate - public void testTransferSimpleQueueCopy() throws Exception { - internalTransferSimpleQueue(false); + @Test + public void testTryAllPermutations() throws Exception { + + Collection<String[]> options = mixProtocolOptions(); + int iteration = 0; + for (String[] option : options) { + logger.info("{} {} {} {} - iteration = {}", option[0], option[1], option[2], option[3], iteration); + internalTransferSimpleQueue("queue_a" + iteration, false, option[0], option[1], option[2], option[3]); + internalTransferSimpleQueue("queue_b" + iteration, true, option[0], option[1], option[2], option[3]); + testDurableSharedSubscrition("topic_c" + iteration, "queue_c" + iteration, option[0], option[1], option[2], option[3]); + testSharedSubscription("topic_d" + iteration, "queue_d" + iteration, option[0], option[1], option[2], option[3]); + testDurableConsumer("topic_e" + iteration, "queue_e" + iteration, option[0], option[1], option[2], option[3]); + iteration++; + } } - @TestTemplate - public void testTransferSimpleQueue() throws Exception { - internalTransferSimpleQueue(true); - } + private static void callTransferQueue(String targetURL, String sourceQueue, String sourceTopic, String sharedDurableSubscription, String sharedSubscription, String durableConsumer, String clientID, String targetQueue, String sourceProtocol, String transferProtocol, boolean copy) throws Exception { - public String getQueueName() { - return getName(); - } + Run.setEmbedded(true); // Telling the CLI to not use System.exit + + Transfer transfer = new Transfer(); + File artemisInstance = getFileServerLocation(SERVER_NAME_0); + File etc = new File(artemisInstance, "etc"); + transfer.setHomeValues(HelperCreate.getHome(ARTEMIS_HOME_PROPERTY), getFileServerLocation(SERVER_NAME_0), etc); + transfer.setTargetURL(targetURL); + if (sourceQueue != null) { + transfer.setSourceQueue(sourceQueue); + } + if (sourceTopic != null) { + transfer.setSourceTopic(sourceTopic); + } + if (sharedDurableSubscription != null) { + transfer.setSharedDurableSubscription(sharedDurableSubscription); + } + if (sharedSubscription != null) { + transfer.setSharedSubscription(sharedSubscription); + } + if (durableConsumer != null) { + transfer.setDurableConsumer(durableConsumer); + } + if (clientID != null) { + transfer.setSourceClientID(clientID); + } + transfer.setReceiveTimeout(100); + transfer.setTargetQueue(targetQueue); + transfer.setSourceProtocol(sourceProtocol); + transfer.setTargetProtocol(transferProtocol); + transfer.setCopy(copy); + transfer.execute(new ActionContext()); - public String getTopicName() { - return "Topic" + getName(); } - private void internalTransferSimpleQueue(boolean copy) throws Exception { - ConnectionFactory factory = createSenderCF(); + private void internalTransferSimpleQueue(String queueName, boolean copy, + String senderProtocol, String consumerProtocol, + String sourceTransferProtocol, String targetTransferProtocol) throws Exception { + ConnectionFactory factory = createSenderCF(senderProtocol); Connection connection = factory.createConnection(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue(getQueueName()); + Queue queue = session.createQueue(queueName); MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -191,29 +187,14 @@ public class TransferTest extends SmokeTestBase { } session.commit(); + callTransferQueue("tcp://localhost:61716", queueName, null, null, null, null, null, queueName, sourceTransferProtocol, targetTransferProtocol, copy); - String[] argsArray = new String[]{"transfer", "--target-url", "tcp://localhost:61716", "--source-queue", getQueueName(), "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "0"}; - - if (copy) { - ArrayList<String> copyArgs = new ArrayList<>(); - for (String a : argsArray) { - copyArgs.add(a); - } - if (copy) { - copyArgs.add("--copy"); - } - argsArray = copyArgs.toArray(new String[copyArgs.size()]); - } - - Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", argsArray); - transferProcess.waitFor(); - - ConnectionFactory factoryTarget = createConsumerCF(); + ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol); Connection connectionTarget = factoryTarget.createConnection(); connectionTarget.start(); Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED); - Queue queueTarget = sessionTarget.createQueue(getQueueName()); + Queue queueTarget = sessionTarget.createQueue(queueName); MessageConsumer consumer = sessionTarget.createConsumer(queueTarget); for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { @@ -243,13 +224,12 @@ public class TransferTest extends SmokeTestBase { connectionTarget.close(); } - @TestTemplate - public void testDurableSharedSubscrition() throws Exception { - ConnectionFactory factory = createSenderCF(); + public void testDurableSharedSubscrition(String topicName, String queueName, String senderProtocol, String consumerProtocol, String sourceTransferProtocol, String targetTransferProtocol) throws Exception { + ConnectionFactory factory = createSenderCF(senderProtocol); Connection connection = factory.createConnection(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(getTopicName()); + Topic topic = session.createTopic(topicName); MessageConsumer subscription = session.createSharedDurableConsumer(topic, "testSubs"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -261,15 +241,14 @@ public class TransferTest extends SmokeTestBase { session.commit(); subscription.close(); - Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--shared-durable-subscription", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "1000", "--verbose"); - transferProcess.waitFor(); + callTransferQueue("tcp://localhost:61716", null, topicName, "testSubs", null, null, null, queueName, sourceTransferProtocol, targetTransferProtocol, false); - ConnectionFactory factoryTarget = createConsumerCF(); + ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol); Connection connectionTarget = factoryTarget.createConnection(); connectionTarget.start(); Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED); - Queue queueTarget = sessionTarget.createQueue(getQueueName()); + Queue queueTarget = sessionTarget.createQueue(queueName); MessageConsumer consumer = sessionTarget.createConsumer(queueTarget); for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { @@ -281,17 +260,17 @@ public class TransferTest extends SmokeTestBase { assertNull(consumer.receiveNoWait()); + session.unsubscribe("testSubs"); connection.close(); connectionTarget.close(); } - @TestTemplate - public void testSharedSubscrition() throws Exception { - ConnectionFactory factory = createSenderCF(); + public void testSharedSubscription(String topicName, String queueName, String senderProtocol, String consumerProtocol, String sourceTransferProtocol, String targetTransferProtocol) throws Exception { + ConnectionFactory factory = createSenderCF(senderProtocol); Connection connection = factory.createConnection(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(getTopicName()); + Topic topic = session.createTopic(topicName); MessageConsumer subscription = session.createSharedConsumer(topic, "testSubs"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -305,8 +284,7 @@ public class TransferTest extends SmokeTestBase { session.commit(); - Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--shared-subscription", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "0", "--verbose"); - transferProcess.waitFor(); + callTransferQueue("tcp://localhost:61716", null, topicName, null, "testSubs", null, null, queueName, sourceTransferProtocol, targetTransferProtocol, false); // this test is a bit tricky as the subscription would be removed when the consumer is gone... // I'm adding a test for completion only @@ -314,12 +292,12 @@ public class TransferTest extends SmokeTestBase { // which will not receive all the messages as some messages will be in delivering mode subscription.close(); - ConnectionFactory factoryTarget = createConsumerCF(); + ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol); Connection connectionTarget = factoryTarget.createConnection(); connectionTarget.start(); Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED); - Queue queueTarget = sessionTarget.createQueue(getQueueName()); + Queue queueTarget = sessionTarget.createQueue(queueName); MessageConsumer consumer = sessionTarget.createConsumer(queueTarget); // we are keeping a non durable subscription so the temporary queue still up @@ -335,14 +313,13 @@ public class TransferTest extends SmokeTestBase { connectionTarget.close(); } - @TestTemplate - public void testDurableConsumer() throws Exception { - ConnectionFactory factory = createSenderCF(); + public void testDurableConsumer(String topicName, String queueName, String senderProtocol, String consumerProtocol, String sourceTransferProtocol, String targetTransferProtocol) throws Exception { + ConnectionFactory factory = createSenderCF(senderProtocol); Connection connection = factory.createConnection(); connection.setClientID("test"); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(getTopicName()); + Topic topic = session.createTopic(topicName); MessageConsumer subscription = session.createDurableConsumer(topic, "testSubs"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -355,15 +332,14 @@ public class TransferTest extends SmokeTestBase { connection.close(); subscription.close(); - Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--source-client-id", "test", "--durable-consumer", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "1000", "--verbose", "--silent"); - transferProcess.waitFor(); + callTransferQueue("tcp://localhost:61716", null, topicName, null, null, "testSubs", "test", queueName, sourceTransferProtocol, targetTransferProtocol, false); - ConnectionFactory factoryTarget = createConsumerCF(); + ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol); Connection connectionTarget = factoryTarget.createConnection(); connectionTarget.start(); Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED); - Queue queueTarget = sessionTarget.createQueue(getQueueName()); + Queue queueTarget = sessionTarget.createQueue(queueName); MessageConsumer consumer = sessionTarget.createConsumer(queueTarget); // we are keeping a non durable subscription so the temporary queue still up --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
