This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ccc4c006d9 ARTEMIS-5801 Add a subscription name option to the consumer
CLI tool
ccc4c006d9 is described below
commit ccc4c006d9567d5e2767052957ce7d3edb5e6c89
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Dec 9 13:52:15 2025 -0500
ARTEMIS-5801 Add a subscription name option to the consumer CLI tool
Adds option 'subscriptionName' to the consumer CLI options to allow
consuming from an existing durable subscription not created by the CLI
---
.../artemis/cli/commands/messages/Consumer.java | 27 +++++++++++-
.../cli/commands/messages/ConsumerThread.java | 7 ++-
.../apache/activemq/cli/test/CliConsumerTest.java | 50 +++++++++++++++++++++-
3 files changed, 81 insertions(+), 3 deletions(-)
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index ef81f1215b..1965d893f6 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -50,6 +50,9 @@ public class Consumer extends DestAbstract {
@Option(names = "--data", description = "Serialize the messages to the
specified file as they are consumed.")
String file;
+ @Option(names = "--subscriptionName", description = "The subscription name
to use for a durable consumer.")
+ String subscriptionName;
+
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
@@ -78,6 +81,15 @@ public class Consumer extends DestAbstract {
serializer.start();
}
+ if (subscriptionName != null) {
+ if (threads != 1) {
+ context.err.println("Error: Cannot assign a subscription name when
multiple threads are also configured.");
+ return null;
+ } else {
+ context.out.println("Consumer:: subscription name = " +
subscriptionName);
+ }
+ }
+
ConnectionFactory factory = createConnectionFactory();
try (Connection connection = factory.createConnection()) {
@@ -92,7 +104,11 @@ public class Consumer extends DestAbstract {
}
Destination dest = getDestination(session);
- threadsArray[i] = new ConsumerThread(session, dest, i, context);
+ if (subscriptionName == null) {
+ threadsArray[i] = new ConsumerThread(session, dest, i, context);
+ } else {
+ threadsArray[i] = new ConsumerThread(session, dest,
subscriptionName, context);
+ }
threadsArray[i]
.setVerbose(verbose)
@@ -191,4 +207,13 @@ public class Consumer extends DestAbstract {
this.file = file;
return this;
}
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public Consumer setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ return this;
+ }
}
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
index 391cded2b1..2bdac84d76 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
@@ -57,7 +57,12 @@ public class ConsumerThread extends Thread {
MessageListener listener;
public ConsumerThread(Session session, Destination destination, int
threadNr, ActionContext context) {
- super("Consumer " + destination.toString() + ", thread=" + threadNr);
+ this(session, destination, "Consumer " + destination.toString() + ",
thread=" + threadNr, context);
+ }
+
+ public ConsumerThread(Session session, Destination destination, String
subscriptionName, ActionContext context) {
+ super(subscriptionName);
+
this.destination = destination;
this.session = session;
this.context = context;
diff --git
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
index f43e1f5d2b..4484acec28 100644
---
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
+++
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
@@ -17,8 +17,9 @@
package org.apache.activemq.cli.test;
import javax.jms.Connection;
-
+import javax.jms.Session;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CliConsumerTest extends CliTestBase {
@@ -113,4 +115,50 @@ public class CliConsumerTest extends CliTestBase {
Wait.assertEquals(0L, () ->
server.locateQueue(address).getMessageCount(), 2000, 50);
}
+
+ @Test
+ public void testConsumeFromExistingDurableSubscription() throws Exception {
+ final String address = "test-topic";
+ final String addressPrefix = "topic://";
+ final String clientID = "test-client";
+ final String subscriptionName = "test-sub";
+ final String credentials = "admin";
+ final TestActionContext context = new TestActionContext();
+
+ // Creates the durable subscription to consumer from using the CLI tool.
+ try (Connection connection = cf.createConnection(credentials,
credentials)) {
+
+ connection.setClientID(clientID);
+ connection.start();
+
+ final Session session = createSession(connection);
+
+ session.createDurableConsumer(session.createTopic(address),
subscriptionName);
+ }
+
+ produceMessages(addressPrefix + address, TEST_MESSAGE_COUNT);
+
+ server.addressQuery(SimpleString.of(address));
+
+ final String subscriptionQueueName =
server.bindingQuery(SimpleString.of(address)).getQueueNames().get(0).toString();
+ assertNotNull(subscriptionQueueName);
+ final org.apache.activemq.artemis.core.server.Queue subscriptionQueue =
server.locateQueue(subscriptionQueueName);
+ Wait.assertEquals((long) TEST_MESSAGE_COUNT, () ->
subscriptionQueue.getMessageCount(), 2000, 50);
+
+ // Consume from the durable subscription with messages added.
+ new Consumer()
+ .setSubscriptionName(subscriptionName)
+ .setReceiveTimeout(100)
+ .setBreakOnNull(true)
+ .setDurable(true)
+ .setMessageCount(TEST_MESSAGE_COUNT)
+ .setDestination(addressPrefix + address)
+ .setClientID(clientID)
+ .setUser(credentials)
+ .setPassword(credentials)
+ .execute(context);
+
+ Wait.assertTrue(() -> context.getStdout().contains("subscription name"),
2000, 100);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000,
50);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]