This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch ARTEMIS-5801 in repository https://gitbox.apache.org/repos/asf/artemis.git
commit 27a57d0d889dd7e16b084ce3b2597904a1b520d5 Author: Timothy Bish <[email protected]> AuthorDate: Tue Dec 9 13:52:15 2025 -0500 ARTEMIS-5801 Add a subscription name option to the consumer CLI tool Adds option 'subscriptionName' to the consumer CLI options to allow consuming from an existing durable subscription not created by the CLI --- .../artemis/cli/commands/messages/Consumer.java | 27 +++++++++++- .../cli/commands/messages/ConsumerThread.java | 7 ++- .../apache/activemq/cli/test/CliConsumerTest.java | 50 +++++++++++++++++++++- 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java index ef81f1215b..1965d893f6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java @@ -50,6 +50,9 @@ public class Consumer extends DestAbstract { @Option(names = "--data", description = "Serialize the messages to the specified file as they are consumed.") String file; + @Option(names = "--subscriptionName", description = "The subscription name to use for a durable consumer.") + String subscriptionName; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -78,6 +81,15 @@ public class Consumer extends DestAbstract { serializer.start(); } + if (subscriptionName != null) { + if (threads != 1) { + context.err.println("Error: Cannot assign a subscription name when multiple threads are also configured."); + return null; + } else { + context.out.println("Consumer:: subscription name = " + subscriptionName); + } + } + ConnectionFactory factory = createConnectionFactory(); try (Connection connection = factory.createConnection()) { @@ -92,7 +104,11 @@ public class Consumer extends DestAbstract { } Destination dest = getDestination(session); - threadsArray[i] = new ConsumerThread(session, dest, i, context); + if (subscriptionName == null) { + threadsArray[i] = new ConsumerThread(session, dest, i, context); + } else { + threadsArray[i] = new ConsumerThread(session, dest, subscriptionName, context); + } threadsArray[i] .setVerbose(verbose) @@ -191,4 +207,13 @@ public class Consumer extends DestAbstract { this.file = file; return this; } + + public String getSubscriptionName() { + return subscriptionName; + } + + public Consumer setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + return this; + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java index 391cded2b1..2bdac84d76 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -57,7 +57,12 @@ public class ConsumerThread extends Thread { MessageListener listener; public ConsumerThread(Session session, Destination destination, int threadNr, ActionContext context) { - super("Consumer " + destination.toString() + ", thread=" + threadNr); + this(session, destination, "Consumer " + destination.toString() + ", thread=" + threadNr, context); + } + + public ConsumerThread(Session session, Destination destination, String subscriptionName, ActionContext context) { + super(subscriptionName); + this.destination = destination; this.session = session; this.context = context; diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java index f43e1f5d2b..4484acec28 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java @@ -17,8 +17,9 @@ package org.apache.activemq.cli.test; import javax.jms.Connection; - +import javax.jms.Session; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.cli.commands.messages.Consumer; import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -29,6 +30,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class CliConsumerTest extends CliTestBase { @@ -113,4 +115,50 @@ public class CliConsumerTest extends CliTestBase { Wait.assertEquals(0L, () -> server.locateQueue(address).getMessageCount(), 2000, 50); } + + @Test + public void testConsumeFromExistingDurableSubscription() throws Exception { + final String address = "test-topic"; + final String addressPrefix = "topic://"; + final String clientID = "test-client"; + final String subscriptionName = "test-sub"; + final String credentials = "admin"; + final TestActionContext context = new TestActionContext(); + + // Creates the durable subscription to consumer from using the CLI tool. + try (Connection connection = cf.createConnection(credentials, credentials)) { + + connection.setClientID(clientID); + connection.start(); + + final Session session = createSession(connection); + + session.createDurableConsumer(session.createTopic(address), subscriptionName); + } + + produceMessages(addressPrefix + address, TEST_MESSAGE_COUNT); + + server.addressQuery(SimpleString.of(address)); + + final String subscriptionQueueName = server.bindingQuery(SimpleString.of(address)).getQueueNames().get(0).toString(); + assertNotNull(subscriptionQueueName); + final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = server.locateQueue(subscriptionQueueName); + Wait.assertEquals((long) TEST_MESSAGE_COUNT, () -> subscriptionQueue.getMessageCount(), 2000, 50); + + // Consume from the durable subscription with messages added. + new Consumer() + .setSubscriptionName(subscriptionName) + .setReceiveTimeout(100) + .setBreakOnNull(true) + .setDurable(true) + .setMessageCount(TEST_MESSAGE_COUNT) + .setDestination(addressPrefix + address) + .setClientID(clientID) + .setUser(credentials) + .setPassword(credentials) + .execute(context); + + Wait.assertTrue(() -> context.getStdout().contains("subscription name"), 2000, 100); + Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000, 50); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
